Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f72bef8570f85a12beda24af873b2c773307b7ca
[simgrid.git] / examples / gras / p2p / chord / chord.c
1 /* Copyright (c) 2006, 2007, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "xbt/sysdep.h"
9 #include "gras.h"
10
11 static int closest_preceding_node(int id);
12 static void check_predecessor(void);
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(chord, "Messages specific to this example");
15
16 typedef enum msg_typus {
17   PING,
18   PONG,
19   GET_PRE,
20   REP_PRE,
21   GET_SUC,
22   REP_SUC,
23   STD,
24 } msg_typus;
25
26 /*GRAS_DEFINE_TYPE(s_pbio,
27   struct s_pbio{
28     msg_typus type;
29     int dest;
30     char msg[1024];
31   };
32 );
33 typedef struct s_pbio pbio_t;*/
34
35 /*GRAS_DEFINE_TYPE(s_ping,*/
36 struct s_ping {
37   int id;
38 };
39 /*);*/
40 typedef struct s_ping ping_t;
41
42 /*GRAS_DEFINE_TYPE(s_pong,*/
43 struct s_pong {
44   int id;
45   int failed;
46 };
47 /*);*/
48 typedef struct s_pong pong_t;
49
50 GRAS_DEFINE_TYPE(s_notify, struct s_notify {
51                  int id; char host[1024]; int port;};);
52
53 typedef struct s_notify notify_t;
54
55 GRAS_DEFINE_TYPE(s_get_suc, struct s_get_suc {
56                  int id;};);
57
58 typedef struct s_get_suc get_suc_t;
59
60 GRAS_DEFINE_TYPE(s_rep_suc, struct s_rep_suc {
61                  int id; char host[1024]; int port;};);
62
63 typedef struct s_rep_suc rep_suc_t;
64
65 typedef struct finger_elem {
66   int id;
67   char host[1024];
68   int port;
69 } finger_elem;
70
71
72
73 static void register_messages()
74 {
75 /*  gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
76   gras_msgtype_declare("chord_get_suc",
77                        gras_datadesc_by_symbol(s_get_suc));
78   gras_msgtype_declare("chord_rep_suc",
79                        gras_datadesc_by_symbol(s_rep_suc));
80   gras_msgtype_declare("chord_notify", gras_datadesc_by_symbol(s_notify));
81 }
82
83 /* Global private data */
84 typedef struct {
85   gras_socket_t sock;           /* server socket on which I'm listening */
86   int id;                       /* my id number */
87   char host[1024];              /* my host name */
88   int port;                     /* port on which I'm listening FIXME */
89   int fingers;                  /* how many fingers */
90   finger_elem *finger;          /* finger table */
91   int next_to_fix;              /* next in the finger list to be checked */
92   int pre_id;                   /* predecessor id */
93   char pre_host[1024];          /* predecessor host */
94   int pre_port;                 /* predecessor port */
95 } node_data_t;
96
97
98 int node(int argc, char **argv);
99
100 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
101   xbt_ex_t e;
102   pbio_t pbio_i=*(pbio_t*)payload_data;
103
104   node_data_t *globals=(node_data_t*)gras_userdata_get();
105
106   XBT_INFO(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
107
108
109
110 }*/
111
112 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,
113                                    void *payload_data)
114 {
115   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
116   xbt_ex_t e;
117   get_suc_t incoming = *(get_suc_t *) payload_data;
118   rep_suc_t outgoing;
119   node_data_t *globals = (node_data_t *) gras_userdata_get();
120   XBT_INFO("Received a get_successor message from %s for %d",
121         gras_socket_peer_name(expeditor), incoming.id);
122   if ((globals->id == globals->finger[0].id) ||
123       (incoming.id > globals->id
124        && incoming.id <= globals->finger[0].id)) {
125     outgoing.id = globals->finger[0].id;
126     snprintf(outgoing.host, 1024, globals->finger[0].host);
127     outgoing.port = globals->finger[0].port;
128     XBT_INFO("My successor is his successor!");
129   } else {
130     gras_socket_t temp_sock;
131     int contact = closest_preceding_node(incoming.id);
132     if (contact == -1) {
133       outgoing.id = globals->finger[0].id;
134       snprintf(outgoing.host, 1024, globals->finger[0].host);
135       outgoing.port = globals->finger[0].port;
136       XBT_INFO("My successor is his successor!");
137     } else {
138       get_suc_t asking;
139       asking.id = incoming.id;
140       TRY {
141         temp_sock = gras_socket_client(globals->finger[contact].host,
142                                        globals->finger[contact].port);
143       }
144       CATCH(e) {
145         RETHROWF("Unable to connect!: %s");
146       }
147       TRY {
148         gras_msg_send(temp_sock, "chord_get_suc", &asking);
149       }
150       CATCH(e) {
151         RETHROWF("Unable to ask!: %s");
152       }
153       gras_msg_wait(10., "chord_rep_suc", &temp_sock, &outgoing);
154     }
155   }
156
157   TRY {
158     gras_msg_send(expeditor, "chord_rep_suc", &outgoing);
159     XBT_INFO("Successor information sent!");
160   }
161   CATCH(e) {
162     RETHROWF("%s:Timeout sending successor information to %s: %s",
163              globals->host, gras_socket_peer_name(expeditor));
164   }
165   gras_socket_close(expeditor);
166   return 0;
167 }
168
169 static int closest_preceding_node(int id)
170 {
171   node_data_t *globals = (node_data_t *) gras_userdata_get();
172   int i;
173   for (i = globals->fingers - 1; i >= 0; i--) {
174     if (globals->finger[i].id > globals->id && globals->finger[i].id < id) {
175       return (i);
176     }
177   }
178
179   return i;
180 }
181
182 static int node_cb_notify_handler(gras_msg_cb_ctx_t ctx,
183                                   void *payload_data)
184 {
185   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
186   /*xbt_ex_t e; */
187   notify_t incoming = *(notify_t *) payload_data;
188   node_data_t *globals = (node_data_t *) gras_userdata_get();
189   XBT_INFO("Received a notifying message from %s as %d",
190         gras_socket_peer_name(expeditor), incoming.id);
191   if (globals->pre_id == -1 ||
192       (incoming.id > globals->pre_id && incoming.id < globals->id)) {
193     globals->pre_id = incoming.id;
194     snprintf(globals->pre_host, 1024, incoming.host);
195     globals->pre_port = incoming.port;
196     XBT_INFO("Set as my new predecessor!");
197   }
198   return 0;
199 }
200
201 static void fix_fingers()
202 {
203   get_suc_t get_suc_msg;
204   xbt_ex_t e;
205   gras_socket_t temp_sock = NULL;
206   gras_socket_t temp_sock2 = NULL;
207   rep_suc_t rep_suc_msg;
208   node_data_t *globals = (node_data_t *) gras_userdata_get();
209
210   TRY {
211     temp_sock = gras_socket_client(globals->host, globals->port);
212   } CATCH(e) {
213     RETHROWF("Unable to contact known host: %s");
214   }
215
216   get_suc_msg.id = globals->id;
217   TRY {
218     gras_msg_send(temp_sock, "chord_get_suc", &get_suc_msg);
219   } CATCH(e) {
220     gras_socket_close(temp_sock);
221     RETHROWF("Unable to contact known host to get successor!: %s");
222   }
223
224   TRY {
225     XBT_INFO("Waiting for reply!");
226     gras_msg_wait(6000, "chord_rep_suc", &temp_sock2, &rep_suc_msg);
227   } CATCH(e) {
228     RETHROWF("%s: Error waiting for successor:%s", globals->host);
229   }
230   globals->finger[0].id = rep_suc_msg.id;
231   snprintf(globals->finger[0].host, 1024, rep_suc_msg.host);
232   globals->finger[0].port = rep_suc_msg.port;
233   XBT_INFO("→ Finger %d fixed!", globals->next_to_fix);
234   gras_socket_close(temp_sock);
235
236   globals->next_to_fix = (++globals->next_to_fix == globals->fingers) ?
237       0 : globals->next_to_fix;
238 }
239
240 static void check_predecessor()
241 {
242   node_data_t *globals = (node_data_t *) gras_userdata_get();
243   gras_socket_t temp_sock;
244   ping_t ping;
245   pong_t pong;
246   xbt_ex_t e;
247   if (globals->pre_id == -1) {
248     return;
249   }
250   TRY {
251     temp_sock = gras_socket_client(globals->pre_host, globals->pre_port);
252   }
253   CATCH(e) {
254     globals->pre_id = -1;
255     globals->pre_host[0] = 0;
256     globals->pre_port = 0;
257     xbt_ex_free(e);
258   }
259
260   ping.id = 0;
261   TRY {
262     gras_msg_send(temp_sock, "chord_ping", &ping);
263   }
264   CATCH(e) {
265     globals->pre_id = -1;
266     globals->pre_host[0] = 0;
267     globals->pre_port = 0;
268     xbt_ex_free(e);
269   }
270   TRY {
271     gras_msg_wait(60, "chord_pong", &temp_sock, &pong);
272   }
273   CATCH(e) {
274     globals->pre_id = -1;
275     globals->pre_host[0] = 0;
276     globals->pre_port = 0;
277     xbt_ex_free(e);
278   }
279   gras_socket_close(temp_sock);
280 }
281
282 int node(int argc, char **argv)
283 {
284   node_data_t *globals = NULL;
285   gras_socket_t temp_sock = NULL;
286   gras_socket_t temp_sock2 = NULL;
287   get_suc_t get_suc_msg;
288   rep_suc_t rep_suc_msg;
289
290   xbt_ex_t e;
291
292   int create = 0;
293   int other_port = -1;
294   char *other_host;
295   notify_t notify_msg;
296   int l;
297
298   /* 1. Init the GRAS infrastructure and declare my globals */
299   gras_init(&argc, argv);
300
301   gras_os_sleep((15 - gras_os_getpid()) * 20);
302
303   globals = gras_userdata_new(node_data_t);
304
305   globals->id = atoi(argv[1]);
306   globals->port = atoi(argv[2]);
307   globals->fingers = 0;
308   globals->finger = NULL;
309   globals->pre_id = -1;
310   globals->pre_host[0] = 0;
311   globals->pre_port = -1;
312
313   snprintf(globals->host, 1024, gras_os_myname());
314
315   if (argc == 3) {
316     create = 1;
317   } else {
318     other_host = xbt_strdup(argv[3]);
319     other_port = atoi(argv[4]);
320   }
321
322   globals->sock = gras_socket_server(globals->port);
323   gras_os_sleep(1.0);
324
325   register_messages();
326
327   globals->finger = (finger_elem *) calloc(1, sizeof(finger_elem));
328   XBT_INFO("Launching node %s:%d", globals->host, globals->port);
329   if (create) {
330     XBT_INFO("→Creating ring");
331     globals->finger[0].id = globals->id;
332     snprintf(globals->finger[0].host, 1024, globals->host);
333     globals->finger[0].port = globals->port;
334   } else {
335     XBT_INFO("→Known node %s:%d", other_host, other_port);
336     XBT_INFO("→Contacting to determine successor");
337     TRY {
338       temp_sock = gras_socket_client(other_host, other_port);
339     }
340     CATCH(e) {
341       RETHROWF("Unable to contact known host: %s");
342     }
343
344     get_suc_msg.id = globals->id;
345     TRY {
346       gras_msg_send(temp_sock, "chord_get_suc", &get_suc_msg);
347     }
348     CATCH(e) {
349       gras_socket_close(temp_sock);
350       RETHROWF("Unable to contact known host to get successor!: %s");
351     }
352
353     TRY {
354       XBT_INFO("Waiting for reply!");
355       gras_msg_wait(10., "chord_rep_suc", &temp_sock2, &rep_suc_msg);
356     }
357     CATCH(e) {
358       RETHROWF("%s: Error waiting for successor:%s", globals->host);
359     }
360     globals->finger[0].id = rep_suc_msg.id;
361     snprintf(globals->finger[0].host, 1024, rep_suc_msg.host);
362     globals->finger[0].port = rep_suc_msg.port;
363     XBT_INFO("→Got successor : %d-%s:%d", globals->finger[0].id,
364           globals->finger[0].host, globals->finger[0].port);
365     gras_socket_close(temp_sock);
366     TRY {
367       temp_sock = gras_socket_client(globals->finger[0].host,
368                                      globals->finger[0].port);
369     }
370     CATCH(e) {
371       RETHROWF("Unable to contact successor: %s");
372     }
373
374     notify_msg.id = globals->id;
375     snprintf(notify_msg.host, 1024, globals->host);
376     notify_msg.port = globals->port;
377     TRY {
378       gras_msg_send(temp_sock, "chord_notify", &notify_msg);
379     }
380     CATCH(e) {
381       RETHROWF("Unable to notify successor! %s");
382     }
383   }
384
385   gras_cb_register("chord_get_suc", &node_cb_get_suc_handler);
386   gras_cb_register("chord_notify", &node_cb_notify_handler);
387   /*gras_cb_register("chord_ping",&node_cb_ping_handler); */
388   /* gras_timer_repeat(600.,fix_fingers); */
389   /*while(1){ */
390
391   for (l = 0; l < 50; l++) {
392     TRY {
393       gras_msg_handle(6000000.0);
394     }
395     CATCH(e) {
396       xbt_ex_free(e);
397     }
398   }
399   /*} */
400
401   gras_socket_close(globals->sock);
402   free(globals);
403   gras_exit();
404   XBT_INFO("Done");
405   return (0);
406 }