Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
dc8219de8a7c986b229fdec84a0d5d6ef52c42a0
[simgrid.git] / src / amok / PeerManagement / peermanagement.c
1 /* amok peer management - servers main loop and remote peer stopping        */
2
3 /* Copyright (c) 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "xbt/sysdep.h"
10 #include "xbt/peer.h"
11 #include "amok/peermanagement.h"
12
13 #include "amok/amok_modinter.h" /* prototype of my module declaration */
14 #include "gras/module.h"        /* module mecanism */
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm, amok, "peer management");
17
18
19 /* data management */
20 int amok_pm_moddata_id = -1;
21 typedef struct {
22   int done;
23   xbt_dict_t groups;
24 } s_amok_pm_moddata_t, *amok_pm_moddata_t;
25
26 /* Message callbacks */
27 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
28 {
29
30   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
31   g->done = 1;
32   return 0;
33 }
34
35 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx, void *payload_data)
36 {
37
38   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
39   g->done = 1;
40   gras_msg_rpcreturn(30, ctx, NULL);
41   return 0;
42 }
43
44 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload)
45 {
46   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
47   char *name = *(void **) payload;
48   xbt_dynar_t res = xbt_dict_get(g->groups, name);
49
50   gras_msg_rpcreturn(30, ctx, &res);
51   return 0;
52 }
53
54 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload)
55 {
56   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
57   char* group_name = *(char* *) payload;
58   xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
59   int rank;
60
61   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
62   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
63                                  gras_socket_peer_port(exp));
64
65   rank = xbt_dynar_length(group);
66   xbt_dynar_push(group, &dude);
67   VERB3("Contacted by %s:%d. Give it rank #%d", dude->name, dude->port,rank);
68
69   gras_msg_rpcreturn(10, ctx, &rank);
70   free(group_name);
71   return 0;
72 }
73
74 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload)
75 {
76   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
77   char *name = *(void **) payload;
78   xbt_dynar_t group = xbt_dict_get(g->groups, name);
79
80   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
81   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
82                                  gras_socket_peer_port(exp));
83
84   unsigned int cpt;
85   xbt_peer_t peer_it;
86
87   xbt_dynar_foreach(group, cpt, peer_it) {
88     if (!strcmp(peer_it->name, dude->name) && peer_it->port == dude->port) {
89       xbt_dynar_cursor_rm(group, &cpt);
90       goto end;
91     }
92   }
93   WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
94         dude->name, dude->port, name);
95
96 end:
97   gras_msg_rpcreturn(30, ctx, NULL);
98   return 0;
99 }
100
101 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload)
102 {
103   char *name = *(void **) payload;
104   amok_pm_group_shutdown(name);
105
106   gras_msg_rpcreturn(30, ctx, NULL);
107   return 0;
108 }
109
110 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
111 void amok_pm_mainloop(double timeOut)
112 {
113   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
114
115   while (!g->done) {
116     gras_msg_handle(timeOut);
117   }
118 }
119
120 /** \brief kill a buddy identified by its peername and port. Note that it is not removed from any group it may belong to. */
121 void amok_pm_kill_hp(char *name, int port)
122 {
123   gras_socket_t sock = gras_socket_client(name, port);
124   amok_pm_kill(sock);
125   gras_socket_close(sock);
126 }
127
128 /** \brief kill a buddy to which we have a socket already. Note that it is not removed from any group it may belong to. */
129 void amok_pm_kill(gras_socket_t buddy)
130 {
131   gras_msg_send(buddy, "amok_pm_kill", NULL);
132 }
133
134 /** \brief kill syncronously a buddy (do not return before its death). Note that it is not removed from any group it may belong to. */
135 void amok_pm_kill_sync(gras_socket_t buddy)
136 {
137   gras_msg_rpccall(buddy, 30, "amok_pm_killrpc", NULL, NULL);
138 }
139
140
141 /** \brief create a new peermanagement group located on local peer 
142  *
143  * The dynar elements are of type xbt_peer_t
144  */
145 xbt_dynar_t amok_pm_group_new(const char *group_name)
146 {
147   amok_pm_moddata_t g;
148   xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
149                                   xbt_peer_free_voidp);
150
151   xbt_assert0(amok_pm_moddata_id != -1, "Run amok_pm_init first!");
152   g = gras_moddata_by_id(amok_pm_moddata_id);
153
154   DEBUG1("retrieved groups=%p", g->groups);
155
156   xbt_dict_set(g->groups, group_name, res, NULL);       /*FIXME: leaking xbt_dynar_free_voidp); */
157   VERB1("Group %s created", group_name);
158
159   return res;
160 }
161
162 /** \brief retrieve all members of the given remote group */
163 xbt_dynar_t amok_pm_group_get(gras_socket_t master, const char *group_name)
164 {
165   xbt_dynar_t res;
166
167   gras_msg_rpccall(master, 30, "amok_pm_get", &group_name, &res);
168   return res;
169 }
170
171 /** \brief add current peer to the given remote group 
172  *
173  * Returns the rank of the process in the group.
174  */
175 int amok_pm_group_join(gras_socket_t master, const char *group_name)
176 {
177   int rank;
178   VERB3("Join group '%s' on %s:%d",
179         group_name, gras_socket_peer_name(master),
180         gras_socket_peer_port(master));
181   gras_msg_rpccall(master, 30, "amok_pm_join", &group_name, &rank);
182   VERB4("Joined group '%s' on %s:%d. Got rank %d",
183         group_name, gras_socket_peer_name(master),
184         gras_socket_peer_port(master),
185         rank);
186   return rank;
187 }
188
189 /** \brief remove current peer from the given remote group if found
190  *
191  * If not found, call is ignored 
192  */
193 void amok_pm_group_leave(gras_socket_t master, const char *group_name)
194 {
195   gras_msg_rpccall(master, 30, "amok_pm_leave", &group_name, NULL);
196   VERB3("Leaved group '%s' on %s:%d",
197         group_name, gras_socket_peer_name(master),
198         gras_socket_peer_port(master));
199 }
200
201 /** \brief stops all members of the given local group */
202 void amok_pm_group_shutdown(const char *group_name)
203 {
204   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
205   xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
206
207   unsigned int cpt;
208   xbt_peer_t peer_it;
209
210   xbt_dynar_foreach(group, cpt, peer_it) {
211     amok_pm_kill_hp(peer_it->name, peer_it->port);
212   }
213
214   xbt_dynar_free(&group);
215   xbt_dict_remove(g->groups, group_name);
216 }
217
218 /** \brief stops all members of the given remote group */
219 void amok_pm_group_shutdown_remote(gras_socket_t master,
220                                    const char *group_name)
221 {
222   gras_msg_rpccall(master, 30, "amok_pm_shutdown", &group_name, NULL);
223 }
224
225
226 /* *
227  * *
228  * * Module management functions
229  * *
230  * */
231
232
233
234 static void _amok_pm_init(void)
235 {
236   /* no world-wide globals */
237   /* Datatype and message declarations */
238   gras_datadesc_type_t pm_group_type =
239     gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"),
240                         xbt_peer_free_voidp);
241
242   gras_msgtype_declare("amok_pm_kill", NULL);
243   gras_msgtype_declare_rpc("amok_pm_killrpc", NULL, NULL);
244
245   gras_msgtype_declare_rpc("amok_pm_get",
246                            gras_datadesc_by_name("string"), pm_group_type);
247   gras_msgtype_declare_rpc("amok_pm_join", gras_datadesc_by_name("string"), gras_datadesc_by_name("int"));
248   gras_msgtype_declare_rpc("amok_pm_leave",
249                            gras_datadesc_by_name("string"), NULL);
250
251   gras_msgtype_declare_rpc("amok_pm_shutdown",
252                            gras_datadesc_by_name("string"), NULL);
253 }
254
255 static void _amok_pm_join(void *p)
256 {
257   /* moddata management */
258   amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
259
260   mod->groups = NULL;
261
262   mod->done = 0;
263   mod->groups = xbt_dict_new();
264
265   /* callbacks */
266   gras_cb_register("amok_pm_kill", &amok_pm_cb_kill);
267   gras_cb_register("amok_pm_killrpc", &amok_pm_cb_killrpc);
268
269   gras_cb_register("amok_pm_get", &amok_pm_cb_get);
270   gras_cb_register("amok_pm_join", &amok_pm_cb_join);
271   gras_cb_register("amok_pm_leave", &amok_pm_cb_leave);
272   gras_cb_register("amok_pm_shutdown", &amok_pm_cb_shutdown);
273 }
274
275 static void _amok_pm_exit(void)
276 {
277   /* no world-wide globals */
278 }
279
280 static void _amok_pm_leave(void *p)
281 {
282   /* moddata */
283   amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
284
285   if (mod->groups)
286     xbt_dict_free(&mod->groups);
287
288   /* callbacks */
289   gras_cb_unregister("amok_pm_kill", &amok_pm_cb_kill);
290   gras_cb_unregister("amok_pm_killrpc", &amok_pm_cb_killrpc);
291
292   gras_cb_unregister("amok_pm_get", &amok_pm_cb_get);
293   gras_cb_unregister("amok_pm_join", &amok_pm_cb_join);
294   gras_cb_unregister("amok_pm_leave", &amok_pm_cb_leave);
295   gras_cb_unregister("amok_pm_shutdown", &amok_pm_cb_shutdown);
296 }
297
298 void amok_pm_modulecreate()
299 {
300   gras_module_add("amok_pm", sizeof(s_amok_pm_moddata_t), &amok_pm_moddata_id,
301                   _amok_pm_init, _amok_pm_exit, _amok_pm_join,
302                   _amok_pm_leave);
303 }
304
305
306
307 /* *
308  * *
309  * * Old module functions (kept for compatibility)
310  * *
311  * */
312 /** \brief Initialize the peer management module. Every process must run it before use */
313 void amok_pm_init()
314 {
315   gras_module_join("amok_pm");
316 }
317
318 /** \brief Finalize the peer management module. Every process should run it after use */
319 void amok_pm_exit()
320 {
321   gras_module_leave("amok_pm");
322 }