Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
5789cb14527d035acd992cd4c91af4083f23349f
[simgrid.git] / examples / gras / p2p / chord / chord.c
1 /* 
2  * vim:ts=2:sw=2:noexpandtab
3  */
4
5 #include "xbt/sysdep.h"
6 #include "gras.h"
7
8 static int closest_preceding_node(int id);
9 static void check_predecessor(void);
10
11 XBT_LOG_NEW_DEFAULT_CATEGORY(chord,"Messages specific to this example");
12
13 typedef enum msg_typus{
14         PING,
15         PONG,
16         GET_PRE,
17         REP_PRE,
18         GET_SUC,
19         REP_SUC,
20         STD,
21 }msg_typus;
22
23 /*GRAS_DEFINE_TYPE(s_pbio,
24         struct s_pbio{
25                 msg_typus type;
26                 int dest;
27                 char msg[1024];
28         };
29 );
30 typedef struct s_pbio pbio_t;*/
31
32 //GRAS_DEFINE_TYPE(s_ping,
33         struct s_ping{
34                 int id;
35         };
36 //);
37 typedef struct s_ping ping_t;
38
39 //GRAS_DEFINE_TYPE(s_pong,
40         struct s_pong{
41                 int id;
42                 int failed;
43         };
44 //);
45 typedef struct s_pong pong_t;
46
47 GRAS_DEFINE_TYPE(s_get_suc,
48         struct s_get_suc{
49                 int id;
50         };
51 );
52 typedef struct s_get_suc get_suc_t;
53
54 GRAS_DEFINE_TYPE(s_rep_suc,
55         struct s_rep_suc{
56                 int id;
57                 char host[1024];
58                 int port;
59         };
60 );
61 typedef struct s_rep_suc rep_suc_t;
62
63 typedef struct finger_elem{
64         int id;
65         char host[1024];
66         int port;
67 }finger_elem;
68
69
70
71 static void register_messages(){
72 /*      gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
73         gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc));
74         gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc));
75 }
76
77 /* Global private data */
78 typedef struct{
79         gras_socket_t sock; /* server socket on which I'm listening */
80         int id; /* my id number */
81         char host[1024]; /* my host name */
82         int port; /* port on which I'm listening FIXME */
83         int fingers; /* how many fingers */
84         finger_elem *finger; /* finger table */
85         char pre_host[1024]; /* predecessor host */
86         int pre_port; /* predecessor port */
87 }node_data_t;
88
89
90 int node(int argc,char **argv);
91
92 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
93         xbt_ex_t e;
94         pbio_t pbio_i=*(pbio_t*)payload_data;
95
96         node_data_t *globals=(node_data_t*)gras_userdata_get();
97
98         INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
99
100
101
102 }*/
103
104 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
105         gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);   
106         xbt_ex_t e;
107         get_suc_t incoming=*(get_suc_t*)payload_data;
108         rep_suc_t outgoing;
109         node_data_t *globals=(node_data_t*)gras_userdata_get();
110         INFO2("Received a get_successor message from %s for %d",gras_socket_peer_name(expeditor),incoming.id);
111         if((globals->id==globals->finger[0].id)||(incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){
112                 outgoing.id=globals->finger[0].id;
113                 snprintf(outgoing.host,1024,globals->finger[0].host);
114                 outgoing.port=globals->finger[0].port;
115                 INFO0("My successor is his successor!");
116         }else{
117                 gras_socket_t temp_sock;
118                 int contact=closest_preceding_node(incoming.id);
119                 get_suc_t asking;asking.id=incoming.id;
120                 TRY{
121                         temp_sock=gras_socket_client(globals->finger[contact].host,globals->finger[contact].port);
122                 }CATCH(e){
123                         RETHROW0("Unable to connect!: %s");
124                 }
125                 TRY{
126                         gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&asking);
127                 }CATCH(e){
128                         RETHROW0("Unable to ask!: %s");
129                 }
130                 gras_msg_wait(10.0,gras_msgtype_by_name("chord_rep_suc"),&temp_sock,&outgoing);
131         }
132
133         TRY{
134                 gras_msg_send(expeditor,gras_msgtype_by_name("chord_rep_suc"),&outgoing);
135                 INFO0("Successor information sent!");
136         }CATCH(e){
137                 RETHROW2("%s:Timeout sending successor information to %s: %s",globals->host,gras_socket_peer_name(expeditor));
138         }
139         gras_socket_close(expeditor);
140         return(1);
141 }
142
143 static int closest_preceding_node(int id){
144         node_data_t *globals=(node_data_t*)gras_userdata_get();
145         int i;
146         for(i=globals->fingers-1;i>=0;i--){
147                 if(globals->finger[i].id>globals->id&&globals->finger[i].id<id){
148                         return(i);
149                 }
150         }
151         return i;
152 }
153
154
155 static void check_predecessor() {
156   node_data_t *globals = (node_data_t*)gras_userdata_get();
157   gras_socket_t temp_sock;
158   xbt_ex_t e;
159 if (globals->pre_host[0] == 0)
160         {
161         return;
162         }
163 TRY
164         {
165         temp_sock = gras_socket_client( globals->pre_host, globals->pre_port );
166         }
167 CATCH(e)
168         {
169         globals->pre_host[0] = 0;
170         globals->pre_port = 0;
171         }
172 ping_t ping_kong;
173 pong_t king_pong;
174 ping_kong.id = 0;
175 TRY
176         {
177         gras_msg_send( temp_sock, gras_msgtype_by_name("chord_ping"),&ping_kong);
178         }
179 CATCH(e)
180         {
181         globals->pre_host[0] = 0;
182         globals->pre_port = 0;
183         }
184 TRY
185         {
186         gras_msg_wait( 6969, gras_msgtype_by_name("chord_pong"), &temp_sock, &king_pong);
187         }
188 CATCH(e)
189         {
190         globals->pre_host[0] = 0;
191         globals->pre_port = 0;
192         }
193 gras_socket_close(temp_sock);
194 }
195
196 int node(int argc,char **argv){
197         node_data_t *globals=NULL;
198         gras_socket_t temp_sock=NULL;
199         gras_socket_t temp_sock2=NULL;
200
201         xbt_ex_t e;
202
203         int create=0;
204         int other_port=-1;
205         char *other_host;
206
207         /* 1. Init the GRAS infrastructure and declare my globals */
208         gras_init(&argc,argv);
209    
210    gras_os_sleep((15-gras_os_getpid())*20);
211    
212         globals=gras_userdata_new(node_data_t);
213
214         globals->id=atoi(argv[1]);
215         globals->port=atoi(argv[2]);
216         globals->fingers=0;
217         globals->finger=NULL;
218         globals->pre_host[0]=0;
219         globals->pre_port=-1;
220         
221         snprintf(globals->host,1024,gras_os_myname());
222
223         if(argc==3){
224                 create=1;
225         }else{
226                 asprintf(&other_host,"%s",argv[3]);
227                 other_port=atoi(argv[4]);
228         }
229         
230         globals->sock=gras_socket_server(globals->port);
231         gras_os_sleep(1.0);
232
233         register_messages();
234         register_messages();
235
236         globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem));
237         INFO2("Launching node %s:%d",globals->host,globals->port);
238         if(create){
239                 INFO0("→Creating ring");
240                 globals->finger[0].id=globals->id;
241                 snprintf(globals->finger[0].host,1024,globals->host);
242                 globals->finger[0].port=globals->port;
243         }else{
244                 INFO2("→Known node %s:%d",other_host,other_port);
245                 INFO0("→Contacting to determine successor");
246                 TRY{
247                         temp_sock=gras_socket_client(other_host,other_port);
248                 }CATCH(e){
249                         RETHROW0("Unable to contact known host!: %s");
250                 }
251                 get_suc_t get_suc_msg;get_suc_msg.id=globals->id;
252                 TRY{
253                         gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&get_suc_msg);
254                 }CATCH(e){
255                         gras_socket_close(temp_sock);
256                         RETHROW0("Unable to contact known host to get successor!: %s");
257                 }
258                 rep_suc_t rep_suc_msg;
259                 TRY{
260                         INFO0("Waiting for reply!");
261                         gras_msg_wait(6000,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2,&rep_suc_msg);
262                 }CATCH(e){
263                         RETHROW1("%s: Error waiting for successor:%s",globals->host);
264                 }
265                 globals->finger[0].id=rep_suc_msg.id;
266                 snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
267                 globals->finger[0].port=rep_suc_msg.port;
268                 INFO3("→Got successor : %d-%s:%d",globals->finger[0].id,globals->finger[0].host,globals->finger[0].port);
269                 gras_socket_close(temp_sock);
270         }
271
272         gras_cb_register(gras_msgtype_by_name("chord_get_suc"),&node_cb_get_suc_handler);
273 //      gras_cb_register(gras_msgtype_by_name("chord_ping"),&node_cb_ping_handler);
274
275         gras_msg_handle(60.0);
276
277         gras_socket_close(globals->sock);
278         free(globals);
279         gras_exit();
280         INFO0("Done");
281         return(0);
282 }