Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Check return value of (v)asprintf.
[simgrid.git] / examples / gras / p2p / chord / chord.c
1 /* Copyright (c) 2006, 2007, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "xbt/sysdep.h"
9 #include "gras.h"
10
11 static int closest_preceding_node(int id);
12 static void check_predecessor(void);
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(chord, "Messages specific to this example");
15
16 typedef enum msg_typus {
17   PING,
18   PONG,
19   GET_PRE,
20   REP_PRE,
21   GET_SUC,
22   REP_SUC,
23   STD,
24 } msg_typus;
25
26 /*GRAS_DEFINE_TYPE(s_pbio,
27   struct s_pbio{
28     msg_typus type;
29     int dest;
30     char msg[1024];
31   };
32 );
33 typedef struct s_pbio pbio_t;*/
34
35 /*GRAS_DEFINE_TYPE(s_ping,*/
36 struct s_ping {
37   int id;
38 };
39 /*);*/
40 typedef struct s_ping ping_t;
41
42 /*GRAS_DEFINE_TYPE(s_pong,*/
43 struct s_pong {
44   int id;
45   int failed;
46 };
47 /*);*/
48 typedef struct s_pong pong_t;
49
50 GRAS_DEFINE_TYPE(s_notify, struct s_notify {
51                  int id; char host[1024]; int port;};);
52
53 typedef struct s_notify notify_t;
54
55 GRAS_DEFINE_TYPE(s_get_suc, struct s_get_suc {
56                  int id;};);
57
58 typedef struct s_get_suc get_suc_t;
59
60 GRAS_DEFINE_TYPE(s_rep_suc, struct s_rep_suc {
61                  int id; char host[1024]; int port;};);
62
63 typedef struct s_rep_suc rep_suc_t;
64
65 typedef struct finger_elem {
66   int id;
67   char host[1024];
68   int port;
69 } finger_elem;
70
71
72
73 static void register_messages()
74 {
75 /*  gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
76   gras_msgtype_declare("chord_get_suc",
77                        gras_datadesc_by_symbol(s_get_suc));
78   gras_msgtype_declare("chord_rep_suc",
79                        gras_datadesc_by_symbol(s_rep_suc));
80   gras_msgtype_declare("chord_notify", gras_datadesc_by_symbol(s_notify));
81 }
82
83 /* Global private data */
84 typedef struct {
85   gras_socket_t sock;           /* server socket on which I'm listening */
86   int id;                       /* my id number */
87   char host[1024];              /* my host name */
88   int port;                     /* port on which I'm listening FIXME */
89   int fingers;                  /* how many fingers */
90   finger_elem *finger;          /* finger table */
91   int next_to_fix;              /* next in the finger list to be checked */
92   int pre_id;                   /* predecessor id */
93   char pre_host[1024];          /* predecessor host */
94   int pre_port;                 /* predecessor port */
95 } node_data_t;
96
97
98 int node(int argc, char **argv);
99
100 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
101   xbt_ex_t e;
102   pbio_t pbio_i=*(pbio_t*)payload_data;
103
104   node_data_t *globals=(node_data_t*)gras_userdata_get();
105
106   INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
107
108
109
110 }*/
111
112 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,
113                                    void *payload_data)
114 {
115   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
116   xbt_ex_t e;
117   get_suc_t incoming = *(get_suc_t *) payload_data;
118   rep_suc_t outgoing;
119   node_data_t *globals = (node_data_t *) gras_userdata_get();
120   INFO2("Received a get_successor message from %s for %d",
121         gras_socket_peer_name(expeditor), incoming.id);
122   if ((globals->id == globals->finger[0].id) ||
123       (incoming.id > globals->id
124        && incoming.id <= globals->finger[0].id)) {
125     outgoing.id = globals->finger[0].id;
126     snprintf(outgoing.host, 1024, globals->finger[0].host);
127     outgoing.port = globals->finger[0].port;
128     INFO0("My successor is his successor!");
129   } else {
130     gras_socket_t temp_sock;
131     int contact = closest_preceding_node(incoming.id);
132     if (contact == -1) {
133       outgoing.id = globals->finger[0].id;
134       snprintf(outgoing.host, 1024, globals->finger[0].host);
135       outgoing.port = globals->finger[0].port;
136       INFO0("My successor is his successor!");
137     } else {
138       get_suc_t asking;
139       asking.id = incoming.id;
140       TRY {
141         temp_sock = gras_socket_client(globals->finger[contact].host,
142                                        globals->finger[contact].port);
143       }
144       CATCH(e) {
145         RETHROW0("Unable to connect!: %s");
146       }
147       TRY {
148         gras_msg_send(temp_sock, "chord_get_suc", &asking);
149       }
150       CATCH(e) {
151         RETHROW0("Unable to ask!: %s");
152       }
153       gras_msg_wait(10., "chord_rep_suc", &temp_sock, &outgoing);
154     }
155   }
156
157   TRY {
158     gras_msg_send(expeditor, "chord_rep_suc", &outgoing);
159     INFO0("Successor information sent!");
160   }
161   CATCH(e) {
162     RETHROW2("%s:Timeout sending successor information to %s: %s",
163              globals->host, gras_socket_peer_name(expeditor));
164   }
165   gras_socket_close(expeditor);
166   return 0;
167 }
168
169 static int closest_preceding_node(int id)
170 {
171   node_data_t *globals = (node_data_t *) gras_userdata_get();
172   int i;
173   for (i = globals->fingers - 1; i >= 0; i--) {
174     if (globals->finger[i].id > globals->id && globals->finger[i].id < id) {
175       return (i);
176     }
177   }
178
179   return i;
180 }
181
182 static int node_cb_notify_handler(gras_msg_cb_ctx_t ctx,
183                                   void *payload_data)
184 {
185   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
186   /*xbt_ex_t e; */
187   notify_t incoming = *(notify_t *) payload_data;
188   node_data_t *globals = (node_data_t *) gras_userdata_get();
189   INFO2("Received a notifying message from %s as %d",
190         gras_socket_peer_name(expeditor), incoming.id);
191   if (globals->pre_id == -1 ||
192       (incoming.id > globals->pre_id && incoming.id < globals->id)) {
193     globals->pre_id = incoming.id;
194     snprintf(globals->pre_host, 1024, incoming.host);
195     globals->pre_port = incoming.port;
196     INFO0("Set as my new predecessor!");
197   }
198   return 0;
199 }
200
201 static void fix_fingers()
202 {
203   get_suc_t get_suc_msg;
204   xbt_ex_t e;
205   gras_socket_t temp_sock = NULL;
206   gras_socket_t temp_sock2 = NULL;
207   rep_suc_t rep_suc_msg;
208   node_data_t *globals = (node_data_t *) gras_userdata_get();
209
210   TRY {
211     temp_sock = gras_socket_client(globals->host, globals->port);
212   } CATCH(e) {
213     RETHROW0("Unable to contact known host: %s");
214   }
215
216   get_suc_msg.id = globals->id;
217   TRY {
218     gras_msg_send(temp_sock, "chord_get_suc", &get_suc_msg);
219   } CATCH(e) {
220     gras_socket_close(temp_sock);
221     RETHROW0("Unable to contact known host to get successor!: %s");
222   }
223
224   TRY {
225     INFO0("Waiting for reply!");
226     gras_msg_wait(6000, "chord_rep_suc", &temp_sock2, &rep_suc_msg);
227   } CATCH(e) {
228     RETHROW1("%s: Error waiting for successor:%s", globals->host);
229   }
230   globals->finger[0].id = rep_suc_msg.id;
231   snprintf(globals->finger[0].host, 1024, rep_suc_msg.host);
232   globals->finger[0].port = rep_suc_msg.port;
233   INFO1("→ Finger %d fixed!", globals->next_to_fix);
234   gras_socket_close(temp_sock);
235
236   globals->next_to_fix = (++globals->next_to_fix == globals->fingers) ?
237       0 : globals->next_to_fix;
238 }
239
240 static void check_predecessor()
241 {
242   node_data_t *globals = (node_data_t *) gras_userdata_get();
243   gras_socket_t temp_sock;
244   ping_t ping;
245   pong_t pong;
246   xbt_ex_t e;
247   if (globals->pre_id == -1) {
248     return;
249   }
250   TRY {
251     temp_sock = gras_socket_client(globals->pre_host, globals->pre_port);
252   }
253   CATCH(e) {
254     globals->pre_id = -1;
255     globals->pre_host[0] = 0;
256     globals->pre_port = 0;
257   }
258
259   ping.id = 0;
260   TRY {
261     gras_msg_send(temp_sock, "chord_ping", &ping);
262   }
263   CATCH(e) {
264     globals->pre_id = -1;
265     globals->pre_host[0] = 0;
266     globals->pre_port = 0;
267   }
268   TRY {
269     gras_msg_wait(60, "chord_pong", &temp_sock, &pong);
270   }
271   CATCH(e) {
272     globals->pre_id = -1;
273     globals->pre_host[0] = 0;
274     globals->pre_port = 0;
275   }
276   gras_socket_close(temp_sock);
277 }
278
279 int node(int argc, char **argv)
280 {
281   node_data_t *globals = NULL;
282   gras_socket_t temp_sock = NULL;
283   gras_socket_t temp_sock2 = NULL;
284   get_suc_t get_suc_msg;
285   rep_suc_t rep_suc_msg;
286
287   xbt_ex_t e;
288
289   int create = 0;
290   int other_port = -1;
291   char *other_host;
292   notify_t notify_msg;
293   int l;
294
295   /* 1. Init the GRAS infrastructure and declare my globals */
296   gras_init(&argc, argv);
297
298   gras_os_sleep((15 - gras_os_getpid()) * 20);
299
300   globals = gras_userdata_new(node_data_t);
301
302   globals->id = atoi(argv[1]);
303   globals->port = atoi(argv[2]);
304   globals->fingers = 0;
305   globals->finger = NULL;
306   globals->pre_id = -1;
307   globals->pre_host[0] = 0;
308   globals->pre_port = -1;
309
310   snprintf(globals->host, 1024, gras_os_myname());
311
312   if (argc == 3) {
313     create = 1;
314   } else {
315     other_host = xbt_strdup(argv[3]);
316     other_port = atoi(argv[4]);
317   }
318
319   globals->sock = gras_socket_server(globals->port);
320   gras_os_sleep(1.0);
321
322   register_messages();
323
324   globals->finger = (finger_elem *) calloc(1, sizeof(finger_elem));
325   INFO2("Launching node %s:%d", globals->host, globals->port);
326   if (create) {
327     INFO0("→Creating ring");
328     globals->finger[0].id = globals->id;
329     snprintf(globals->finger[0].host, 1024, globals->host);
330     globals->finger[0].port = globals->port;
331   } else {
332     INFO2("→Known node %s:%d", other_host, other_port);
333     INFO0("→Contacting to determine successor");
334     TRY {
335       temp_sock = gras_socket_client(other_host, other_port);
336     }
337     CATCH(e) {
338       RETHROW0("Unable to contact known host: %s");
339     }
340
341     get_suc_msg.id = globals->id;
342     TRY {
343       gras_msg_send(temp_sock, "chord_get_suc", &get_suc_msg);
344     }
345     CATCH(e) {
346       gras_socket_close(temp_sock);
347       RETHROW0("Unable to contact known host to get successor!: %s");
348     }
349
350     TRY {
351       INFO0("Waiting for reply!");
352       gras_msg_wait(10., "chord_rep_suc", &temp_sock2, &rep_suc_msg);
353     }
354     CATCH(e) {
355       RETHROW1("%s: Error waiting for successor:%s", globals->host);
356     }
357     globals->finger[0].id = rep_suc_msg.id;
358     snprintf(globals->finger[0].host, 1024, rep_suc_msg.host);
359     globals->finger[0].port = rep_suc_msg.port;
360     INFO3("→Got successor : %d-%s:%d", globals->finger[0].id,
361           globals->finger[0].host, globals->finger[0].port);
362     gras_socket_close(temp_sock);
363     TRY {
364       temp_sock = gras_socket_client(globals->finger[0].host,
365                                      globals->finger[0].port);
366     }
367     CATCH(e) {
368       RETHROW0("Unable to contact successor: %s");
369     }
370
371     notify_msg.id = globals->id;
372     snprintf(notify_msg.host, 1024, globals->host);
373     notify_msg.port = globals->port;
374     TRY {
375       gras_msg_send(temp_sock, "chord_notify", &notify_msg);
376     }
377     CATCH(e) {
378       RETHROW0("Unable to notify successor! %s");
379     }
380   }
381
382   gras_cb_register("chord_get_suc", &node_cb_get_suc_handler);
383   gras_cb_register("chord_notify", &node_cb_notify_handler);
384   /*gras_cb_register("chord_ping",&node_cb_ping_handler); */
385   /* gras_timer_repeat(600.,fix_fingers); */
386   /*while(1){ */
387
388   for (l = 0; l < 50; l++) {
389     TRY {
390       gras_msg_handle(6000000.0);
391     }
392     CATCH(e) {
393     }
394   }
395   /*} */
396
397   gras_socket_close(globals->sock);
398   free(globals);
399   gras_exit();
400   INFO0("Done");
401   return (0);
402 }