Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use CATCH_ANONYMOUS whenever possible, and remove unused variables.
[simgrid.git] / examples / gras / p2p / chord / chord.c
1 /* Copyright (c) 2006, 2007, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "xbt/sysdep.h"
9 #include "gras.h"
10
11 static int closest_preceding_node(int id);
12 static void check_predecessor(void);
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(chord, "Messages specific to this example");
15
16 typedef enum msg_typus {
17   PING,
18   PONG,
19   GET_PRE,
20   REP_PRE,
21   GET_SUC,
22   REP_SUC,
23   STD,
24 } msg_typus;
25
26 /*GRAS_DEFINE_TYPE(s_pbio,
27   struct s_pbio{
28     msg_typus type;
29     int dest;
30     char msg[1024];
31   };
32 );
33 typedef struct s_pbio pbio_t;*/
34
35 /*GRAS_DEFINE_TYPE(s_ping,*/
36 struct s_ping {
37   int id;
38 };
39 /*);*/
40 typedef struct s_ping ping_t;
41
42 /*GRAS_DEFINE_TYPE(s_pong,*/
43 struct s_pong {
44   int id;
45   int failed;
46 };
47 /*);*/
48 typedef struct s_pong pong_t;
49
50 GRAS_DEFINE_TYPE(s_notify, struct s_notify {
51                  int id; char host[1024]; int port;};);
52
53 typedef struct s_notify notify_t;
54
55 GRAS_DEFINE_TYPE(s_get_suc, struct s_get_suc {
56                  int id;};);
57
58 typedef struct s_get_suc get_suc_t;
59
60 GRAS_DEFINE_TYPE(s_rep_suc, struct s_rep_suc {
61                  int id; char host[1024]; int port;};);
62
63 typedef struct s_rep_suc rep_suc_t;
64
65 typedef struct finger_elem {
66   int id;
67   char host[1024];
68   int port;
69 } finger_elem;
70
71
72
73 static void register_messages()
74 {
75 /*  gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
76   gras_msgtype_declare("chord_get_suc",
77                        gras_datadesc_by_symbol(s_get_suc));
78   gras_msgtype_declare("chord_rep_suc",
79                        gras_datadesc_by_symbol(s_rep_suc));
80   gras_msgtype_declare("chord_notify", gras_datadesc_by_symbol(s_notify));
81 }
82
83 /* Global private data */
84 typedef struct {
85   gras_socket_t sock;           /* server socket on which I'm listening */
86   int id;                       /* my id number */
87   char host[1024];              /* my host name */
88   int port;                     /* port on which I'm listening FIXME */
89   int fingers;                  /* how many fingers */
90   finger_elem *finger;          /* finger table */
91   int next_to_fix;              /* next in the finger list to be checked */
92   int pre_id;                   /* predecessor id */
93   char pre_host[1024];          /* predecessor host */
94   int pre_port;                 /* predecessor port */
95 } node_data_t;
96
97
98 int node(int argc, char **argv);
99
100 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
101   xbt_ex_t e;
102   pbio_t pbio_i=*(pbio_t*)payload_data;
103
104   node_data_t *globals=(node_data_t*)gras_userdata_get();
105
106   XBT_INFO(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
107
108
109
110 }*/
111
112 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,
113                                    void *payload_data)
114 {
115   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
116   get_suc_t incoming = *(get_suc_t *) payload_data;
117   rep_suc_t outgoing;
118   node_data_t *globals = (node_data_t *) gras_userdata_get();
119   XBT_INFO("Received a get_successor message from %s for %d",
120         gras_socket_peer_name(expeditor), incoming.id);
121   if ((globals->id == globals->finger[0].id) ||
122       (incoming.id > globals->id
123        && incoming.id <= globals->finger[0].id)) {
124     outgoing.id = globals->finger[0].id;
125     snprintf(outgoing.host, 1024, globals->finger[0].host);
126     outgoing.port = globals->finger[0].port;
127     XBT_INFO("My successor is his successor!");
128   } else {
129     gras_socket_t temp_sock;
130     int contact = closest_preceding_node(incoming.id);
131     if (contact == -1) {
132       outgoing.id = globals->finger[0].id;
133       snprintf(outgoing.host, 1024, globals->finger[0].host);
134       outgoing.port = globals->finger[0].port;
135       XBT_INFO("My successor is his successor!");
136     } else {
137       get_suc_t asking;
138       asking.id = incoming.id;
139       TRY {
140         temp_sock = gras_socket_client(globals->finger[contact].host,
141                                        globals->finger[contact].port);
142       }
143       CATCH_ANONYMOUS {
144         RETHROWF("Unable to connect!: %s");
145       }
146       TRY {
147         gras_msg_send(temp_sock, "chord_get_suc", &asking);
148       }
149       CATCH_ANONYMOUS {
150         RETHROWF("Unable to ask!: %s");
151       }
152       gras_msg_wait(10., "chord_rep_suc", &temp_sock, &outgoing);
153     }
154   }
155
156   TRY {
157     gras_msg_send(expeditor, "chord_rep_suc", &outgoing);
158     XBT_INFO("Successor information sent!");
159   }
160   CATCH_ANONYMOUS {
161     RETHROWF("%s:Timeout sending successor information to %s: %s",
162              globals->host, gras_socket_peer_name(expeditor));
163   }
164   gras_socket_close(expeditor);
165   return 0;
166 }
167
168 static int closest_preceding_node(int id)
169 {
170   node_data_t *globals = (node_data_t *) gras_userdata_get();
171   int i;
172   for (i = globals->fingers - 1; i >= 0; i--) {
173     if (globals->finger[i].id > globals->id && globals->finger[i].id < id) {
174       return (i);
175     }
176   }
177
178   return i;
179 }
180
181 static int node_cb_notify_handler(gras_msg_cb_ctx_t ctx,
182                                   void *payload_data)
183 {
184   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
185   /*xbt_ex_t e; */
186   notify_t incoming = *(notify_t *) payload_data;
187   node_data_t *globals = (node_data_t *) gras_userdata_get();
188   XBT_INFO("Received a notifying message from %s as %d",
189         gras_socket_peer_name(expeditor), incoming.id);
190   if (globals->pre_id == -1 ||
191       (incoming.id > globals->pre_id && incoming.id < globals->id)) {
192     globals->pre_id = incoming.id;
193     snprintf(globals->pre_host, 1024, incoming.host);
194     globals->pre_port = incoming.port;
195     XBT_INFO("Set as my new predecessor!");
196   }
197   return 0;
198 }
199
200 static void fix_fingers()
201 {
202   get_suc_t get_suc_msg;
203   gras_socket_t temp_sock = NULL;
204   gras_socket_t temp_sock2 = NULL;
205   rep_suc_t rep_suc_msg;
206   node_data_t *globals = (node_data_t *) gras_userdata_get();
207
208   TRY {
209     temp_sock = gras_socket_client(globals->host, globals->port);
210   }
211   CATCH_ANONYMOUS {
212     RETHROWF("Unable to contact known host: %s");
213   }
214
215   get_suc_msg.id = globals->id;
216   TRY {
217     gras_msg_send(temp_sock, "chord_get_suc", &get_suc_msg);
218   }
219   CATCH_ANONYMOUS {
220     gras_socket_close(temp_sock);
221     RETHROWF("Unable to contact known host to get successor!: %s");
222   }
223
224   TRY {
225     XBT_INFO("Waiting for reply!");
226     gras_msg_wait(6000, "chord_rep_suc", &temp_sock2, &rep_suc_msg);
227   }
228   CATCH_ANONYMOUS {
229     RETHROWF("%s: Error waiting for successor:%s", globals->host);
230   }
231   globals->finger[0].id = rep_suc_msg.id;
232   snprintf(globals->finger[0].host, 1024, rep_suc_msg.host);
233   globals->finger[0].port = rep_suc_msg.port;
234   XBT_INFO("→ Finger %d fixed!", globals->next_to_fix);
235   gras_socket_close(temp_sock);
236
237   globals->next_to_fix = (++globals->next_to_fix == globals->fingers) ?
238       0 : globals->next_to_fix;
239 }
240
241 static void check_predecessor()
242 {
243   node_data_t *globals = (node_data_t *) gras_userdata_get();
244   gras_socket_t temp_sock;
245   ping_t ping;
246   pong_t pong;
247   xbt_ex_t e;
248   if (globals->pre_id == -1) {
249     return;
250   }
251   TRY {
252     temp_sock = gras_socket_client(globals->pre_host, globals->pre_port);
253   }
254   CATCH(e) {
255     globals->pre_id = -1;
256     globals->pre_host[0] = 0;
257     globals->pre_port = 0;
258     xbt_ex_free(e);
259   }
260
261   ping.id = 0;
262   TRY {
263     gras_msg_send(temp_sock, "chord_ping", &ping);
264   }
265   CATCH(e) {
266     globals->pre_id = -1;
267     globals->pre_host[0] = 0;
268     globals->pre_port = 0;
269     xbt_ex_free(e);
270   }
271   TRY {
272     gras_msg_wait(60, "chord_pong", &temp_sock, &pong);
273   }
274   CATCH(e) {
275     globals->pre_id = -1;
276     globals->pre_host[0] = 0;
277     globals->pre_port = 0;
278     xbt_ex_free(e);
279   }
280   gras_socket_close(temp_sock);
281 }
282
283 int node(int argc, char **argv)
284 {
285   node_data_t *globals = NULL;
286   gras_socket_t temp_sock = NULL;
287   gras_socket_t temp_sock2 = NULL;
288   get_suc_t get_suc_msg;
289   rep_suc_t rep_suc_msg;
290
291   xbt_ex_t e;
292
293   int create = 0;
294   int other_port = -1;
295   char *other_host;
296   notify_t notify_msg;
297   int l;
298
299   /* 1. Init the GRAS infrastructure and declare my globals */
300   gras_init(&argc, argv);
301
302   gras_os_sleep((15 - gras_os_getpid()) * 20);
303
304   globals = gras_userdata_new(node_data_t);
305
306   globals->id = atoi(argv[1]);
307   globals->port = atoi(argv[2]);
308   globals->fingers = 0;
309   globals->finger = NULL;
310   globals->pre_id = -1;
311   globals->pre_host[0] = 0;
312   globals->pre_port = -1;
313
314   snprintf(globals->host, 1024, gras_os_myname());
315
316   if (argc == 3) {
317     create = 1;
318   } else {
319     other_host = xbt_strdup(argv[3]);
320     other_port = atoi(argv[4]);
321   }
322
323   globals->sock = gras_socket_server(globals->port);
324   gras_os_sleep(1.0);
325
326   register_messages();
327
328   globals->finger = (finger_elem *) calloc(1, sizeof(finger_elem));
329   XBT_INFO("Launching node %s:%d", globals->host, globals->port);
330   if (create) {
331     XBT_INFO("→Creating ring");
332     globals->finger[0].id = globals->id;
333     snprintf(globals->finger[0].host, 1024, globals->host);
334     globals->finger[0].port = globals->port;
335   } else {
336     XBT_INFO("→Known node %s:%d", other_host, other_port);
337     XBT_INFO("→Contacting to determine successor");
338     TRY {
339       temp_sock = gras_socket_client(other_host, other_port);
340     }
341     CATCH_ANONYMOUS {
342       RETHROWF("Unable to contact known host: %s");
343     }
344
345     get_suc_msg.id = globals->id;
346     TRY {
347       gras_msg_send(temp_sock, "chord_get_suc", &get_suc_msg);
348     }
349     CATCH_ANONYMOUS {
350       gras_socket_close(temp_sock);
351       RETHROWF("Unable to contact known host to get successor!: %s");
352     }
353
354     TRY {
355       XBT_INFO("Waiting for reply!");
356       gras_msg_wait(10., "chord_rep_suc", &temp_sock2, &rep_suc_msg);
357     }
358     CATCH_ANONYMOUS {
359       RETHROWF("%s: Error waiting for successor:%s", globals->host);
360     }
361     globals->finger[0].id = rep_suc_msg.id;
362     snprintf(globals->finger[0].host, 1024, rep_suc_msg.host);
363     globals->finger[0].port = rep_suc_msg.port;
364     XBT_INFO("→Got successor : %d-%s:%d", globals->finger[0].id,
365           globals->finger[0].host, globals->finger[0].port);
366     gras_socket_close(temp_sock);
367     TRY {
368       temp_sock = gras_socket_client(globals->finger[0].host,
369                                      globals->finger[0].port);
370     }
371     CATCH_ANONYMOUS {
372       RETHROWF("Unable to contact successor: %s");
373     }
374
375     notify_msg.id = globals->id;
376     snprintf(notify_msg.host, 1024, globals->host);
377     notify_msg.port = globals->port;
378     TRY {
379       gras_msg_send(temp_sock, "chord_notify", &notify_msg);
380     }
381     CATCH_ANONYMOUS {
382       RETHROWF("Unable to notify successor! %s");
383     }
384   }
385
386   gras_cb_register("chord_get_suc", &node_cb_get_suc_handler);
387   gras_cb_register("chord_notify", &node_cb_notify_handler);
388   /*gras_cb_register("chord_ping",&node_cb_ping_handler); */
389   /* gras_timer_repeat(600.,fix_fingers); */
390   /*while(1){ */
391
392   for (l = 0; l < 50; l++) {
393     TRY {
394       gras_msg_handle(6000000.0);
395     }
396     CATCH(e) {
397       xbt_ex_free(e);
398     }
399   }
400   /*} */
401
402   gras_socket_close(globals->sock);
403   free(globals);
404   gras_exit();
405   XBT_INFO("Done");
406   return (0);
407 }