Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
I don't think it does what it should now but at least it looks better and
[simgrid.git] / examples / gras / p2p / chord / chord.c
1 /* 
2  * vim:ts=2:sw=2:noexpandtab
3  */
4
5 #include "xbt/sysdep.h"
6 #include "gras.h"
7
8 static int closest_preceding_node(int id);
9 static void check_predecessor(void);
10
11 XBT_LOG_NEW_DEFAULT_CATEGORY(chord,"Messages specific to this example");
12
13 typedef enum msg_typus{
14         PING,
15         PONG,
16         GET_PRE,
17         REP_PRE,
18         GET_SUC,
19         REP_SUC,
20         STD,
21 }msg_typus;
22
23 /*GRAS_DEFINE_TYPE(s_pbio,
24         struct s_pbio{
25                 msg_typus type;
26                 int dest;
27                 char msg[1024];
28         };
29 );
30 typedef struct s_pbio pbio_t;*/
31
32 //GRAS_DEFINE_TYPE(s_ping,
33         struct s_ping{
34                 int id;
35         };
36 //);
37 typedef struct s_ping ping_t;
38
39 //GRAS_DEFINE_TYPE(s_pong,
40         struct s_pong{
41                 int id;
42                 int failed;
43         };
44 //);
45 typedef struct s_pong pong_t;
46
47 GRAS_DEFINE_TYPE(s_get_suc,
48         struct s_get_suc{
49                 int id;
50         };
51 );
52 typedef struct s_get_suc get_suc_t;
53
54 GRAS_DEFINE_TYPE(s_rep_suc,
55         struct s_rep_suc{
56                 int id;
57                 char host[1024];
58                 int port;
59         };
60 );
61 typedef struct s_rep_suc rep_suc_t;
62
63 typedef struct finger_elem{
64         int id;
65         char host[1024];
66         int port;
67 }finger_elem;
68
69
70
71 static void register_messages(){
72 /*      gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
73         gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc));
74         gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc));
75 }
76
77 /* Global private data */
78 typedef struct{
79         gras_socket_t sock; /* server socket on which I'm listening */
80         int id; /* my id number */
81         char host[1024]; /* my host name */
82         int port; /* port on which I'm listening FIXME */
83         int fingers; /* how many fingers */
84         finger_elem *finger; /* finger table */
85         char pre_host[1024]; /* predecessor host */
86         int pre_port; /* predecessor port */
87 }node_data_t;
88
89
90 int node(int argc,char **argv);
91
92 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
93         xbt_ex_t e;
94         pbio_t pbio_i=*(pbio_t*)payload_data;
95
96         node_data_t *globals=(node_data_t*)gras_userdata_get();
97
98         INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
99
100
101
102 }*/
103
104 static int node_cb_get_suc_handler(gras_socket_t expeditor,void *payload_data){
105         xbt_ex_t e;
106         get_suc_t incoming=*(get_suc_t*)payload_data;
107         rep_suc_t outgoing;
108         node_data_t *globals=(node_data_t*)gras_userdata_get();
109         INFO2("Received a get_successor message from %s for %d",gras_socket_peer_name(expeditor),incoming.id);
110         if((globals->id==globals->finger[0].id)||(incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){
111                 outgoing.id=globals->finger[0].id;
112                 snprintf(outgoing.host,1024,globals->finger[0].host);
113                 outgoing.port=globals->finger[0].port;
114                 INFO0("My successor is his successor!");
115         }else{
116                 gras_socket_t temp_sock;
117                 int contact=closest_preceding_node(incoming.id);
118                 get_suc_t asking;asking.id=incoming.id;
119                 TRY{
120                         temp_sock=gras_socket_client(globals->finger[contact].host,globals->finger[contact].port);
121                 }CATCH(e){
122                         RETHROW0("Unable to connect!: %s");
123                 }
124                 TRY{
125                         gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&asking);
126                 }CATCH(e){
127                         RETHROW0("Unable to ask!: %s");
128                 }
129                 gras_msg_wait(10.0,gras_msgtype_by_name("chord_rep_suc"),&temp_sock,&outgoing);
130         }
131
132         TRY{
133                 gras_msg_send(expeditor,gras_msgtype_by_name("chord_rep_suc"),&outgoing);
134                 INFO0("Successor information sent!");
135         }CATCH(e){
136                 RETHROW2("%s:Timeout sending successor information to %s: %s",globals->host,gras_socket_peer_name(expeditor));
137         }
138         gras_socket_close(expeditor);
139         return(1);
140 }
141
142 static int closest_preceding_node(int id){
143         node_data_t *globals=(node_data_t*)gras_userdata_get();
144         int i;
145         for(i=globals->fingers-1;i>=0;i--){
146                 if(globals->finger[i].id>globals->id&&globals->finger[i].id<id){
147                         return(i);
148                 }
149         }
150         return i;
151 }
152
153
154 static void check_predecessor() {
155   node_data_t *globals = (node_data_t*)gras_userdata_get();
156   gras_socket_t temp_sock;
157   xbt_ex_t e;
158 if (globals->pre_host[0] == 0)
159         {
160         return;
161         }
162 TRY
163         {
164         temp_sock = gras_socket_client( globals->pre_host, globals->pre_port );
165         }
166 CATCH(e)
167         {
168         globals->pre_host[0] = 0;
169         globals->pre_port = 0;
170         }
171 ping_t ping_kong;
172 pong_t king_pong;
173 ping_kong.id = 0;
174 TRY
175         {
176         gras_msg_send( temp_sock, gras_msgtype_by_name("chord_ping"),&ping_kong);
177         }
178 CATCH(e)
179         {
180         globals->pre_host[0] = 0;
181         globals->pre_port = 0;
182         }
183 TRY
184         {
185         gras_msg_wait( 6969, gras_msgtype_by_name("chord_pong"), &temp_sock, &king_pong);
186         }
187 CATCH(e)
188         {
189         globals->pre_host[0] = 0;
190         globals->pre_port = 0;
191         }
192 gras_socket_close(temp_sock);
193 }
194
195 int node(int argc,char **argv){
196         node_data_t *globals=NULL;
197         gras_socket_t temp_sock=NULL;
198         gras_socket_t temp_sock2=NULL;
199
200         xbt_ex_t e;
201
202         int create=0;
203         int other_port=-1;
204         char *other_host;
205
206         /* 1. Init the GRAS infrastructure and declare my globals */
207         gras_init(&argc,argv);
208    
209    gras_os_sleep((15-gras_os_getpid())*20);
210    
211         globals=gras_userdata_new(node_data_t);
212
213         globals->id=atoi(argv[1]);
214         globals->port=atoi(argv[2]);
215         globals->fingers=0;
216         globals->finger=NULL;
217         globals->pre_host[0]=0;
218         globals->pre_port=-1;
219         
220         snprintf(globals->host,1024,gras_os_myname());
221
222         if(argc==3){
223                 create=1;
224         }else{
225                 asprintf(&other_host,"%s",argv[3]);
226                 other_port=atoi(argv[4]);
227         }
228         
229         globals->sock=gras_socket_server(globals->port);
230         gras_os_sleep(1.0);
231
232         register_messages();
233         register_messages();
234
235         globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem));
236         INFO2("Launching node %s:%d",globals->host,globals->port);
237         if(create){
238                 INFO0("→Creating ring");
239                 globals->finger[0].id=globals->id;
240                 snprintf(globals->finger[0].host,1024,globals->host);
241                 globals->finger[0].port=globals->port;
242         }else{
243                 INFO2("→Known node %s:%d",other_host,other_port);
244                 INFO0("→Contacting to determine successor");
245                 TRY{
246                         temp_sock=gras_socket_client(other_host,other_port);
247                 }CATCH(e){
248                         RETHROW0("Unable to contact known host!: %s");
249                 }
250                 get_suc_t get_suc_msg;get_suc_msg.id=globals->id;
251                 TRY{
252                         gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&get_suc_msg);
253                 }CATCH(e){
254                         gras_socket_close(temp_sock);
255                         RETHROW0("Unable to contact known host to get successor!: %s");
256                 }
257                 rep_suc_t rep_suc_msg;
258                 TRY{
259                         INFO0("Waiting for reply!");
260                         gras_msg_wait(6000,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2,&rep_suc_msg);
261                 }CATCH(e){
262                         RETHROW1("%s: Error waiting for successor:%s",globals->host);
263                 }
264                 globals->finger[0].id=rep_suc_msg.id;
265                 snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
266                 globals->finger[0].port=rep_suc_msg.port;
267                 INFO3("→Got successor : %d-%s:%d",globals->finger[0].id,globals->finger[0].host,globals->finger[0].port);
268                 gras_socket_close(temp_sock);
269         }
270
271         gras_cb_register(gras_msgtype_by_name("chord_get_suc"),&node_cb_get_suc_handler);
272 //      gras_cb_register(gras_msgtype_by_name("chord_ping"),&node_cb_ping_handler);
273
274         gras_msg_handle(60.0);
275
276         gras_socket_close(globals->sock);
277         free(globals);
278         gras_exit();
279         INFO0("Done");
280         return(0);
281 }