Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
61b525f6d856e3ccf47806b992f43124bcc0a2f8
[simgrid.git] / examples / gras / p2p / chord / chord.c
1 /*
2  * vim:ts=2:sw=2:expandtab
3  */
4
5 #include <stdio.h>
6 #include "xbt/sysdep.h"
7 #include "gras.h"
8
9 static int closest_preceding_node(int id);
10 static void check_predecessor(void);
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(chord,"Messages specific to this example");
13
14 typedef enum msg_typus{
15   PING,
16   PONG,
17   GET_PRE,
18   REP_PRE,
19   GET_SUC,
20   REP_SUC,
21   STD,
22 }msg_typus;
23
24 /*GRAS_DEFINE_TYPE(s_pbio,
25   struct s_pbio{
26     msg_typus type;
27     int dest;
28     char msg[1024];
29   };
30 );
31 typedef struct s_pbio pbio_t;*/
32
33 /*GRAS_DEFINE_TYPE(s_ping,*/
34   struct s_ping{
35     int id;
36   };
37 /*);*/
38 typedef struct s_ping ping_t;
39
40 /*GRAS_DEFINE_TYPE(s_pong,*/
41   struct s_pong{
42     int id;
43     int failed;
44   };
45 /*);*/
46 typedef struct s_pong pong_t;
47
48 GRAS_DEFINE_TYPE(s_notify,
49   struct s_notify{
50     int id;
51     char host[1024];
52     int port;
53   };
54 );
55 typedef struct s_notify notify_t;
56
57 GRAS_DEFINE_TYPE(s_get_suc,
58   struct s_get_suc{
59     int id;
60   };
61 );
62 typedef struct s_get_suc get_suc_t;
63
64 GRAS_DEFINE_TYPE(s_rep_suc,
65   struct s_rep_suc{
66     int id;
67     char host[1024];
68     int port;
69   };
70 );
71 typedef struct s_rep_suc rep_suc_t;
72
73 typedef struct finger_elem{
74   int id;
75   char host[1024];
76   int port;
77 }finger_elem;
78
79
80
81 static void register_messages(){
82 /*  gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
83   gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc));
84   gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc));
85   gras_msgtype_declare("chord_notify",gras_datadesc_by_symbol(s_notify));
86 }
87
88 /* Global private data */
89 typedef struct{
90   gras_socket_t sock; /* server socket on which I'm listening */
91   int id; /* my id number */
92   char host[1024]; /* my host name */
93   int port; /* port on which I'm listening FIXME */
94   int fingers; /* how many fingers */
95   finger_elem *finger; /* finger table */
96   int next_to_fix; /* next in the finger list to be checked */
97   int pre_id; /* predecessor id */
98   char pre_host[1024]; /* predecessor host */
99   int pre_port; /* predecessor port */
100 }node_data_t;
101
102
103 int node(int argc,char **argv);
104
105 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
106   xbt_ex_t e;
107   pbio_t pbio_i=*(pbio_t*)payload_data;
108
109   node_data_t *globals=(node_data_t*)gras_userdata_get();
110
111   INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
112
113
114
115 }*/
116
117 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
118   gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);   
119   xbt_ex_t e;
120   get_suc_t incoming=*(get_suc_t*)payload_data;
121   rep_suc_t outgoing;
122   node_data_t *globals=(node_data_t*)gras_userdata_get();
123   INFO2("Received a get_successor message from %s for %d",
124         gras_socket_peer_name(expeditor),incoming.id);
125   if((globals->id==globals->finger[0].id)||
126      (incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){
127     outgoing.id=globals->finger[0].id;
128     snprintf(outgoing.host,1024,globals->finger[0].host);
129     outgoing.port=globals->finger[0].port;
130     INFO0("My successor is his successor!");
131   }else{
132     gras_socket_t temp_sock;
133     int contact=closest_preceding_node(incoming.id);
134     if(contact==-1){
135       outgoing.id=globals->finger[0].id;
136       snprintf(outgoing.host,1024,globals->finger[0].host);
137       outgoing.port=globals->finger[0].port;
138       INFO0("My successor is his successor!");
139     }else{
140       get_suc_t asking;asking.id=incoming.id;
141       TRY{
142         temp_sock=gras_socket_client(globals->finger[contact].host,
143                                      globals->finger[contact].port);
144       }CATCH(e){
145         RETHROW0("Unable to connect!: %s");
146       }
147       TRY{
148         gras_msg_send(temp_sock,"chord_get_suc",&asking);
149       }CATCH(e){
150         RETHROW0("Unable to ask!: %s");
151       }
152       gras_msg_wait(10.,"chord_rep_suc",&temp_sock, &outgoing);
153     }
154   }
155   
156   TRY{
157     gras_msg_send(expeditor,"chord_rep_suc",&outgoing);
158     INFO0("Successor information sent!");
159   }CATCH(e){
160     RETHROW2("%s:Timeout sending successor information to %s: %s",
161              globals->host,gras_socket_peer_name(expeditor));
162   }
163   gras_socket_close(expeditor);
164   return 0;
165 }
166
167 static int closest_preceding_node(int id){
168   node_data_t *globals=(node_data_t*)gras_userdata_get();
169   int i;
170   for(i=globals->fingers-1;i>=0;i--){
171     if(globals->finger[i].id>globals->id&&globals->finger[i].id<id){
172       return(i);
173     }
174   }
175   
176   return i;
177 }
178
179 static int node_cb_notify_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
180   gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);   
181   /*xbt_ex_t e;*/
182   notify_t incoming=*(notify_t*)payload_data;
183   node_data_t *globals=(node_data_t*)gras_userdata_get();
184   INFO2("Received a notifying message from %s as %d",
185         gras_socket_peer_name(expeditor),incoming.id);
186   if(globals->pre_id==-1||
187      (incoming.id>globals->pre_id&&incoming.id<globals->id)){
188     globals->pre_id=incoming.id;
189     snprintf(globals->pre_host,1024,incoming.host);
190     globals->pre_port=incoming.port;
191     INFO0("Set as my new predecessor!");
192   }
193   return 0;
194 }
195
196 static void fix_fingers(){
197         get_suc_t get_suc_msg;
198   xbt_ex_t e;
199   gras_socket_t temp_sock=NULL;
200   gras_socket_t temp_sock2=NULL;
201   rep_suc_t rep_suc_msg;
202   node_data_t *globals=(node_data_t*)gras_userdata_get();
203
204   TRY{
205     temp_sock=gras_socket_client(globals->host,globals->port);
206   }CATCH(e){
207     RETHROW0("Unable to contact known host: %s");
208   }
209
210   get_suc_msg.id=globals->id;
211   TRY{
212     gras_msg_send(temp_sock,"chord_get_suc",&get_suc_msg);
213   }CATCH(e){
214     gras_socket_close(temp_sock);
215     RETHROW0("Unable to contact known host to get successor!: %s");
216   }
217
218   TRY{
219     INFO0("Waiting for reply!");
220     gras_msg_wait(6000,"chord_rep_suc",&temp_sock2, &rep_suc_msg);
221   }CATCH(e){
222     RETHROW1("%s: Error waiting for successor:%s",globals->host);
223   }
224   globals->finger[0].id=rep_suc_msg.id;
225   snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
226   globals->finger[0].port=rep_suc_msg.port;
227   INFO1("→ Finger %d fixed!",globals->next_to_fix);
228   gras_socket_close(temp_sock);
229   
230   globals->next_to_fix=(++globals->next_to_fix==globals->fingers)?
231                        0:globals->next_to_fix;
232 }
233
234 static void check_predecessor(){
235   node_data_t *globals = (node_data_t*)gras_userdata_get();
236   gras_socket_t temp_sock;
237    ping_t ping;
238   pong_t pong;
239   xbt_ex_t e;
240   if (globals->pre_id == -1){
241     return;
242   }
243   TRY{
244     temp_sock = gras_socket_client( globals->pre_host, globals->pre_port );
245   }CATCH(e){
246     globals->pre_id = -1;
247     globals->pre_host[0] = 0;
248     globals->pre_port = 0;
249   }
250
251   ping.id = 0;
252   TRY{
253     gras_msg_send( temp_sock, "chord_ping",&ping);
254   }CATCH(e){
255     globals->pre_id = -1;
256     globals->pre_host[0] = 0;
257     globals->pre_port = 0;
258   }
259   TRY{
260     gras_msg_wait( 60, "chord_pong", &temp_sock, &pong);
261   }CATCH(e){
262     globals->pre_id = -1;
263     globals->pre_host[0] = 0;
264     globals->pre_port = 0;
265   }
266   gras_socket_close(temp_sock);
267 }
268
269 int node(int argc,char **argv){
270   node_data_t *globals=NULL;
271   gras_socket_t temp_sock=NULL;
272   gras_socket_t temp_sock2=NULL;
273   get_suc_t get_suc_msg;
274   rep_suc_t rep_suc_msg;
275
276   xbt_ex_t e;
277
278   int create=0;
279   int other_port=-1;
280   char *other_host;
281   notify_t notify_msg;
282   int l;
283
284   /* 1. Init the GRAS infrastructure and declare my globals */
285   gras_init(&argc,argv);
286    
287   gras_os_sleep((15-gras_os_getpid())*20);
288    
289   globals=gras_userdata_new(node_data_t);
290
291   globals->id=atoi(argv[1]);
292   globals->port=atoi(argv[2]);
293   globals->fingers=0;
294   globals->finger=NULL;
295   globals->pre_id=-1;
296   globals->pre_host[0]=0;
297   globals->pre_port=-1;
298   
299   snprintf(globals->host,1024,gras_os_myname());
300
301   if(argc==3){
302     create=1;
303   }else{
304     asprintf(&other_host,"%s",argv[3]);
305     other_port=atoi(argv[4]);
306   }
307   
308   globals->sock=gras_socket_server(globals->port);
309   gras_os_sleep(1.0);
310
311   register_messages();
312
313   globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem));
314   INFO2("Launching node %s:%d",globals->host,globals->port);
315   if(create){
316     INFO0("→Creating ring");
317     globals->finger[0].id=globals->id;
318     snprintf(globals->finger[0].host,1024,globals->host);
319     globals->finger[0].port=globals->port;
320   }else{
321     INFO2("→Known node %s:%d",other_host,other_port);
322     INFO0("→Contacting to determine successor");
323     TRY{
324       temp_sock=gras_socket_client(other_host,other_port);
325     }CATCH(e){
326       RETHROW0("Unable to contact known host: %s");
327     }
328
329         get_suc_msg.id=globals->id;
330     TRY{
331       gras_msg_send(temp_sock,"chord_get_suc", &get_suc_msg);
332     }CATCH(e){
333       gras_socket_close(temp_sock);
334       RETHROW0("Unable to contact known host to get successor!: %s");
335     }
336
337     TRY{
338       INFO0("Waiting for reply!");
339       gras_msg_wait(10.,"chord_rep_suc",&temp_sock2, &rep_suc_msg);
340     }CATCH(e){
341       RETHROW1("%s: Error waiting for successor:%s",globals->host);
342     }
343     globals->finger[0].id=rep_suc_msg.id;
344     snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
345     globals->finger[0].port=rep_suc_msg.port;
346     INFO3("→Got successor : %d-%s:%d",globals->finger[0].id,
347           globals->finger[0].host,globals->finger[0].port);
348     gras_socket_close(temp_sock);
349     TRY{
350       temp_sock=gras_socket_client(globals->finger[0].host,
351                                    globals->finger[0].port);
352     }CATCH(e){
353       RETHROW0("Unable to contact successor: %s");
354     }
355
356         notify_msg.id=globals->id;
357     snprintf(notify_msg.host,1024,globals->host);
358     notify_msg.port=globals->port;
359     TRY{
360       gras_msg_send(temp_sock,"chord_notify",&notify_msg);
361     }CATCH(e){
362       RETHROW0("Unable to notify successor! %s");
363     }
364   }
365   
366   gras_cb_register("chord_get_suc", &node_cb_get_suc_handler);
367   gras_cb_register("chord_notify",  &node_cb_notify_handler);
368   /*gras_cb_register("chord_ping",&node_cb_ping_handler);*/
369  /* gras_timer_repeat(600.,fix_fingers);*/
370   /*while(1){*/
371
372   for(l=0;l<50;l++){
373     TRY{
374       gras_msg_handle(6000000.0);
375     }CATCH(e){
376     }
377   }
378   /*}*/
379   
380   gras_socket_close(globals->sock);
381   free(globals);
382   gras_exit();
383   INFO0("Done");
384   return(0);
385 }