Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
37d061e5df756311b3a94a74c5f91486a9dacf74
[simgrid.git] / examples / gras / p2p / chord / chord.c
1 /* 
2  * vim:ts=2:sw=2:noexpandtab
3  */
4
5 #include "chord.h"
6
7 static void register_messages(){
8 /*      gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
9         gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc));
10         gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc));
11 }
12
13 /* Global private data */
14 typedef struct{
15         gras_socket_t sock; /* server socket on which I'm listening */
16         int id; /* my id number */
17         char host[1024]; /* my host name */
18         int port; /* port on which I'm listening FIXME */
19         int fingers; /* how many fingers */
20         finger_elem *finger; /* finger table */
21         char pre_host[1024]; /* predecessor host */
22         int pre_port; /* predecessor port */
23 }node_data_t;
24
25
26 int node(int argc,char **argv);
27
28 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
29         xbt_ex_t e;
30         pbio_t pbio_i=*(pbio_t*)payload_data;
31
32         node_data_t *globals=(node_data_t*)gras_userdata_get();
33
34         INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
35
36
37
38 }*/
39
40 static int node_cb_get_suc_handler(gras_socket_t expeditor,void *payload_data){
41         xbt_ex_t e;
42         get_suc_t incoming=*(get_suc_t*)payload_data;
43         rep_suc_t outgoing;
44         node_data_t *globals=(node_data_t*)gras_userdata_get();
45         INFO2("Received a get_successor message from %s for %d",gras_socket_peer_name(expeditor),incoming.id);
46         if((globals->id==globals->finger[0].id)||(incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){
47                 outgoing.id=globals->finger[0].id;
48                 snprintf(outgoing.host,1024,globals->finger[0].host);
49                 outgoing.port=globals->finger[0].port;
50                 INFO0("My successor is his successor!");
51         }else{
52                 gras_socket_t temp_sock;
53                 int contact=closest_preceding_node(incoming.id);
54                 get_suc_t asking;asking.id=incoming.id;
55                 TRY{
56                         temp_sock=gras_socket_client(globals->finger[contact].host,globals->finger[contact].port);
57                 }CATCH(e){
58                         RETHROW0("Unable to connect!");
59                 }
60                 TRY{
61                         gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&asking);
62                 }CATCH(e){
63                         RETHROW0("Unable to ask!");
64                 }
65                 gras_msg_wait(10.0,gras_msgtype_by_name("chord_rep_suc"),&temp_sock,&outgoing);
66         }
67
68         TRY{
69                 gras_msg_send(expeditor,gras_msgtype_by_name("chord_rep_suc"),&outgoing);
70                 INFO0("Successor information sent!");
71         }CATCH(e){
72                 RETHROW2("%s:Timeout sending successor information to %s",globals->host,gras_socket_peer_name(expeditor));
73         }
74         gras_socket_close(expeditor);
75         return(1);
76 }
77
78 int closest_preceding_node(int id){
79         node_data_t *globals=(node_data_t*)gras_userdata_get();
80         int i;
81         for(i=globals->fingers-1;i>=0;i--){
82                 if(globals->finger[i].id>globals->id&&globals->finger[i].id<id){
83                         return(i);
84                 }
85         return(i);
86         }
87 }
88
89
90 void check_predecessor()
91 {
92 node_data_t *globals = (node_data_t*)gras_userdata_get();
93 gras_socket_t temp_sock;
94 if (globals->pre_host[0] == 0)
95         {
96         return;
97         }
98 TRY
99         {
100         temp_sock = gras_socket_client( globals->pre_host, globals->pre_port );
101         }
102 CATCH(e)
103         {
104         globals->pre_host[0] = 0;
105         globals->pre_port = 0;
106         }
107 ping_t ping_kong;
108 pong_t king_pong;
109 ping_kong->id = 0;
110 TRY
111         {
112         gras_msg_send( temp_sock, gras_msgtype_by_name("chord_ping"),&ping_kong);
113         }
114 CATCH(e)
115         {
116         globals->pre_host[0] = 0;
117         globals->pre_port = 0;
118         }
119 TRY
120         {
121         gras_msg_wait( 6969, gras_msgtype_by_name("chord_pong"), &temp_sock, &king_pong);
122         }
123 CATCH(e)
124         {
125         globals->pre_host[0] = 0;
126         globals->pre_port = 0;
127         }
128 gras_socket_close(temp_sock);
129 }
130
131 int node(int argc,char **argv){
132         node_data_t *globals=NULL;
133         gras_socket_t temp_sock,temp_sock2;
134
135         xbt_ex_t e;
136
137         int create=0;
138         int other_port;
139         char *other_host;
140
141         /* 1. Init the GRAS infrastructure and declare my globals */
142         gras_init(&argc,argv);
143         globals=gras_userdata_new(node_data_t);
144
145         globals->id=atoi(argv[1]);
146         globals->port=atoi(argv[2]);
147         globals->fingers=0;
148         globals->finger=NULL;
149         globals->pre_host[0]=0;
150         globals->pre_port=-1;
151         
152         snprintf(globals->host,1024,gras_os_myname());
153
154         if(argc==3){
155                 create=1;
156         }else{
157                 other_host=strndup(argv[3],1024);
158                 other_port=atoi(argv[4]);
159         }
160         
161         globals->sock=gras_socket_server(globals->port);
162         gras_os_sleep(1.0);
163
164         register_messages();
165         register_messages();
166
167         globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem));
168         INFO2("Launching node %s:%d",globals->host,globals->port);
169         if(create){
170                 INFO0("→Creating ring");
171                 globals->finger[0].id=globals->id;
172                 snprintf(globals->finger[0].host,1024,globals->host);
173                 globals->finger[0].port=globals->port;
174         }else{
175                 INFO2("→Known node %s:%d",other_host,other_port);
176                 INFO0("→Contacting to determine successor");
177                 TRY{
178                         temp_sock=gras_socket_client(other_host,other_port);
179                 }CATCH(e){
180                         RETHROW0("Unable to contact known host!");
181                 }
182                 get_suc_t get_suc_msg;get_suc_msg.id=globals->id;
183                 TRY{
184                         gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&get_suc_msg);
185                 }CATCH(e){
186                         gras_socket_close(temp_sock);
187                         RETHROW0("Unable to contact known host to get successor!");
188                 }
189                 rep_suc_t rep_suc_msg;
190                 TRY{
191                         INFO0("Waiting for reply!");
192                         gras_msg_wait(6000,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2,&rep_suc_msg);
193                 }CATCH(e){
194                         RETHROW2("%s: Error waiting for successor:%s",globals->host,e.msg);
195                 }
196                 globals->finger[0].id=rep_suc_msg.id;
197                 snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
198                 globals->finger[0].port=rep_suc_msg.port;
199                 INFO3("→Got successor : %d-%s:%d",globals->finger[0].id,globals->finger[0].host,globals->finger[0].port);
200                 gras_socket_close(temp_sock);
201         }
202
203         gras_cb_register(gras_msgtype_by_name("chord_get_suc"),&node_cb_get_suc_handler);
204 //      gras_cb_register(gras_msgtype_by_name("chord_ping"),&node_cb_ping_handler);
205
206         gras_msg_handle(60.0);
207
208         gras_socket_close(globals->sock);
209         free(globals);
210         gras_exit();
211         INFO0("Done");
212         return(0);
213 }