Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
b6b9de1efbe5827095dd655454d0973e6477f72c
[simgrid.git] / examples / gras / p2p / chord / chord.c
1 /* Copyright (c) 2006, 2007, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "xbt/sysdep.h"
9 #include "gras.h"
10
11 static int closest_preceding_node(int id);
12 static void check_predecessor(void);
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(chord,"Messages specific to this example");
15
16 typedef enum msg_typus{
17   PING,
18   PONG,
19   GET_PRE,
20   REP_PRE,
21   GET_SUC,
22   REP_SUC,
23   STD,
24 }msg_typus;
25
26 /*GRAS_DEFINE_TYPE(s_pbio,
27   struct s_pbio{
28     msg_typus type;
29     int dest;
30     char msg[1024];
31   };
32 );
33 typedef struct s_pbio pbio_t;*/
34
35 /*GRAS_DEFINE_TYPE(s_ping,*/
36   struct s_ping{
37     int id;
38   };
39 /*);*/
40 typedef struct s_ping ping_t;
41
42 /*GRAS_DEFINE_TYPE(s_pong,*/
43   struct s_pong{
44     int id;
45     int failed;
46   };
47 /*);*/
48 typedef struct s_pong pong_t;
49
50 GRAS_DEFINE_TYPE(s_notify,
51   struct s_notify{
52     int id;
53     char host[1024];
54     int port;
55   };
56 );
57 typedef struct s_notify notify_t;
58
59 GRAS_DEFINE_TYPE(s_get_suc,
60   struct s_get_suc{
61     int id;
62   };
63 );
64 typedef struct s_get_suc get_suc_t;
65
66 GRAS_DEFINE_TYPE(s_rep_suc,
67   struct s_rep_suc{
68     int id;
69     char host[1024];
70     int port;
71   };
72 );
73 typedef struct s_rep_suc rep_suc_t;
74
75 typedef struct finger_elem{
76   int id;
77   char host[1024];
78   int port;
79 }finger_elem;
80
81
82
83 static void register_messages(){
84 /*  gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
85   gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc));
86   gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc));
87   gras_msgtype_declare("chord_notify",gras_datadesc_by_symbol(s_notify));
88 }
89
90 /* Global private data */
91 typedef struct{
92   gras_socket_t sock; /* server socket on which I'm listening */
93   int id; /* my id number */
94   char host[1024]; /* my host name */
95   int port; /* port on which I'm listening FIXME */
96   int fingers; /* how many fingers */
97   finger_elem *finger; /* finger table */
98   int next_to_fix; /* next in the finger list to be checked */
99   int pre_id; /* predecessor id */
100   char pre_host[1024]; /* predecessor host */
101   int pre_port; /* predecessor port */
102 }node_data_t;
103
104
105 int node(int argc,char **argv);
106
107 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
108   xbt_ex_t e;
109   pbio_t pbio_i=*(pbio_t*)payload_data;
110
111   node_data_t *globals=(node_data_t*)gras_userdata_get();
112
113   INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
114
115
116
117 }*/
118
119 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
120   gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);   
121   xbt_ex_t e;
122   get_suc_t incoming=*(get_suc_t*)payload_data;
123   rep_suc_t outgoing;
124   node_data_t *globals=(node_data_t*)gras_userdata_get();
125   INFO2("Received a get_successor message from %s for %d",
126         gras_socket_peer_name(expeditor),incoming.id);
127   if((globals->id==globals->finger[0].id)||
128      (incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){
129     outgoing.id=globals->finger[0].id;
130     snprintf(outgoing.host,1024,globals->finger[0].host);
131     outgoing.port=globals->finger[0].port;
132     INFO0("My successor is his successor!");
133   }else{
134     gras_socket_t temp_sock;
135     int contact=closest_preceding_node(incoming.id);
136     if(contact==-1){
137       outgoing.id=globals->finger[0].id;
138       snprintf(outgoing.host,1024,globals->finger[0].host);
139       outgoing.port=globals->finger[0].port;
140       INFO0("My successor is his successor!");
141     }else{
142       get_suc_t asking;asking.id=incoming.id;
143       TRY{
144         temp_sock=gras_socket_client(globals->finger[contact].host,
145                                      globals->finger[contact].port);
146       }CATCH(e){
147         RETHROW0("Unable to connect!: %s");
148       }
149       TRY{
150         gras_msg_send(temp_sock,"chord_get_suc",&asking);
151       }CATCH(e){
152         RETHROW0("Unable to ask!: %s");
153       }
154       gras_msg_wait(10.,"chord_rep_suc",&temp_sock, &outgoing);
155     }
156   }
157   
158   TRY{
159     gras_msg_send(expeditor,"chord_rep_suc",&outgoing);
160     INFO0("Successor information sent!");
161   }CATCH(e){
162     RETHROW2("%s:Timeout sending successor information to %s: %s",
163              globals->host,gras_socket_peer_name(expeditor));
164   }
165   gras_socket_close(expeditor);
166   return 0;
167 }
168
169 static int closest_preceding_node(int id){
170   node_data_t *globals=(node_data_t*)gras_userdata_get();
171   int i;
172   for(i=globals->fingers-1;i>=0;i--){
173     if(globals->finger[i].id>globals->id&&globals->finger[i].id<id){
174       return(i);
175     }
176   }
177   
178   return i;
179 }
180
181 static int node_cb_notify_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
182   gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);   
183   /*xbt_ex_t e;*/
184   notify_t incoming=*(notify_t*)payload_data;
185   node_data_t *globals=(node_data_t*)gras_userdata_get();
186   INFO2("Received a notifying message from %s as %d",
187         gras_socket_peer_name(expeditor),incoming.id);
188   if(globals->pre_id==-1||
189      (incoming.id>globals->pre_id&&incoming.id<globals->id)){
190     globals->pre_id=incoming.id;
191     snprintf(globals->pre_host,1024,incoming.host);
192     globals->pre_port=incoming.port;
193     INFO0("Set as my new predecessor!");
194   }
195   return 0;
196 }
197
198 static void fix_fingers(){
199         get_suc_t get_suc_msg;
200   xbt_ex_t e;
201   gras_socket_t temp_sock=NULL;
202   gras_socket_t temp_sock2=NULL;
203   rep_suc_t rep_suc_msg;
204   node_data_t *globals=(node_data_t*)gras_userdata_get();
205
206   TRY{
207     temp_sock=gras_socket_client(globals->host,globals->port);
208   }CATCH(e){
209     RETHROW0("Unable to contact known host: %s");
210   }
211
212   get_suc_msg.id=globals->id;
213   TRY{
214     gras_msg_send(temp_sock,"chord_get_suc",&get_suc_msg);
215   }CATCH(e){
216     gras_socket_close(temp_sock);
217     RETHROW0("Unable to contact known host to get successor!: %s");
218   }
219
220   TRY{
221     INFO0("Waiting for reply!");
222     gras_msg_wait(6000,"chord_rep_suc",&temp_sock2, &rep_suc_msg);
223   }CATCH(e){
224     RETHROW1("%s: Error waiting for successor:%s",globals->host);
225   }
226   globals->finger[0].id=rep_suc_msg.id;
227   snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
228   globals->finger[0].port=rep_suc_msg.port;
229   INFO1("→ Finger %d fixed!",globals->next_to_fix);
230   gras_socket_close(temp_sock);
231   
232   globals->next_to_fix=(++globals->next_to_fix==globals->fingers)?
233                        0:globals->next_to_fix;
234 }
235
236 static void check_predecessor(){
237   node_data_t *globals = (node_data_t*)gras_userdata_get();
238   gras_socket_t temp_sock;
239    ping_t ping;
240   pong_t pong;
241   xbt_ex_t e;
242   if (globals->pre_id == -1){
243     return;
244   }
245   TRY{
246     temp_sock = gras_socket_client( globals->pre_host, globals->pre_port );
247   }CATCH(e){
248     globals->pre_id = -1;
249     globals->pre_host[0] = 0;
250     globals->pre_port = 0;
251   }
252
253   ping.id = 0;
254   TRY{
255     gras_msg_send( temp_sock, "chord_ping",&ping);
256   }CATCH(e){
257     globals->pre_id = -1;
258     globals->pre_host[0] = 0;
259     globals->pre_port = 0;
260   }
261   TRY{
262     gras_msg_wait( 60, "chord_pong", &temp_sock, &pong);
263   }CATCH(e){
264     globals->pre_id = -1;
265     globals->pre_host[0] = 0;
266     globals->pre_port = 0;
267   }
268   gras_socket_close(temp_sock);
269 }
270
271 int node(int argc,char **argv){
272   node_data_t *globals=NULL;
273   gras_socket_t temp_sock=NULL;
274   gras_socket_t temp_sock2=NULL;
275   get_suc_t get_suc_msg;
276   rep_suc_t rep_suc_msg;
277
278   xbt_ex_t e;
279
280   int create=0;
281   int other_port=-1;
282   char *other_host;
283   notify_t notify_msg;
284   int l;
285
286   /* 1. Init the GRAS infrastructure and declare my globals */
287   gras_init(&argc,argv);
288    
289   gras_os_sleep((15-gras_os_getpid())*20);
290    
291   globals=gras_userdata_new(node_data_t);
292
293   globals->id=atoi(argv[1]);
294   globals->port=atoi(argv[2]);
295   globals->fingers=0;
296   globals->finger=NULL;
297   globals->pre_id=-1;
298   globals->pre_host[0]=0;
299   globals->pre_port=-1;
300   
301   snprintf(globals->host,1024,gras_os_myname());
302
303   if(argc==3){
304     create=1;
305   }else{
306     asprintf(&other_host,"%s",argv[3]);
307     other_port=atoi(argv[4]);
308   }
309   
310   globals->sock=gras_socket_server(globals->port);
311   gras_os_sleep(1.0);
312
313   register_messages();
314
315   globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem));
316   INFO2("Launching node %s:%d",globals->host,globals->port);
317   if(create){
318     INFO0("→Creating ring");
319     globals->finger[0].id=globals->id;
320     snprintf(globals->finger[0].host,1024,globals->host);
321     globals->finger[0].port=globals->port;
322   }else{
323     INFO2("→Known node %s:%d",other_host,other_port);
324     INFO0("→Contacting to determine successor");
325     TRY{
326       temp_sock=gras_socket_client(other_host,other_port);
327     }CATCH(e){
328       RETHROW0("Unable to contact known host: %s");
329     }
330
331         get_suc_msg.id=globals->id;
332     TRY{
333       gras_msg_send(temp_sock,"chord_get_suc", &get_suc_msg);
334     }CATCH(e){
335       gras_socket_close(temp_sock);
336       RETHROW0("Unable to contact known host to get successor!: %s");
337     }
338
339     TRY{
340       INFO0("Waiting for reply!");
341       gras_msg_wait(10.,"chord_rep_suc",&temp_sock2, &rep_suc_msg);
342     }CATCH(e){
343       RETHROW1("%s: Error waiting for successor:%s",globals->host);
344     }
345     globals->finger[0].id=rep_suc_msg.id;
346     snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
347     globals->finger[0].port=rep_suc_msg.port;
348     INFO3("→Got successor : %d-%s:%d",globals->finger[0].id,
349           globals->finger[0].host,globals->finger[0].port);
350     gras_socket_close(temp_sock);
351     TRY{
352       temp_sock=gras_socket_client(globals->finger[0].host,
353                                    globals->finger[0].port);
354     }CATCH(e){
355       RETHROW0("Unable to contact successor: %s");
356     }
357
358         notify_msg.id=globals->id;
359     snprintf(notify_msg.host,1024,globals->host);
360     notify_msg.port=globals->port;
361     TRY{
362       gras_msg_send(temp_sock,"chord_notify",&notify_msg);
363     }CATCH(e){
364       RETHROW0("Unable to notify successor! %s");
365     }
366   }
367   
368   gras_cb_register("chord_get_suc", &node_cb_get_suc_handler);
369   gras_cb_register("chord_notify",  &node_cb_notify_handler);
370   /*gras_cb_register("chord_ping",&node_cb_ping_handler);*/
371  /* gras_timer_repeat(600.,fix_fingers);*/
372   /*while(1){*/
373
374   for(l=0;l<50;l++){
375     TRY{
376       gras_msg_handle(6000000.0);
377     }CATCH(e){
378     }
379   }
380   /*}*/
381   
382   gras_socket_close(globals->sock);
383   free(globals);
384   gras_exit();
385   INFO0("Done");
386   return(0);
387 }