1 /* amok peer management - servers main loop and remote peer stopping */
3 /* Copyright (c) 2006 Martin Quinson. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "xbt/sysdep.h"
10 #include "amok/peermanagement.h"
12 #include "amok/amok_modinter.h" /* prototype of my module declaration */
13 #include "gras/module.h" /* module mecanism */
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm, amok, "peer management");
19 int amok_pm_moddata_id = -1;
23 } s_amok_pm_moddata_t, *amok_pm_moddata_t;
25 /* Message callbacks */
26 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
29 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
34 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx, void *payload_data)
37 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
39 gras_msg_rpcreturn(30, ctx, NULL);
43 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload)
45 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
46 char *name = *(void **) payload;
47 xbt_dynar_t res = xbt_dict_get(g->groups, name);
49 gras_msg_rpcreturn(30, ctx, &res);
53 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload)
55 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
56 char* group_name = *(char* *) payload;
57 xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
60 gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
61 xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
62 gras_socket_peer_port(exp));
64 rank = xbt_dynar_length(group);
65 xbt_dynar_push(group, &dude);
66 VERB3("Contacted by %s:%d. Give it rank #%d", dude->name, dude->port,rank);
68 gras_msg_rpcreturn(10, ctx, &rank);
73 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload)
75 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
76 char *name = *(void **) payload;
77 xbt_dynar_t group = xbt_dict_get(g->groups, name);
79 gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
80 xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
81 gras_socket_peer_port(exp));
86 xbt_dynar_foreach(group, cpt, peer_it) {
87 if (!strcmp(peer_it->name, dude->name) && peer_it->port == dude->port) {
88 xbt_dynar_cursor_rm(group, &cpt);
92 WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
93 dude->name, dude->port, name);
96 gras_msg_rpcreturn(30, ctx, NULL);
100 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload)
102 char *name = *(void **) payload;
103 amok_pm_group_shutdown(name);
105 gras_msg_rpcreturn(30, ctx, NULL);
109 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
110 void amok_pm_mainloop(double timeOut)
112 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
115 gras_msg_handle(timeOut);
119 /** \brief kill a buddy identified by its peername and port. Note that it is not removed from any group it may belong to. */
120 void amok_pm_kill_hp(char *name, int port)
122 gras_socket_t sock = gras_socket_client(name, port);
124 gras_socket_close(sock);
127 /** \brief kill a buddy to which we have a socket already. Note that it is not removed from any group it may belong to. */
128 void amok_pm_kill(gras_socket_t buddy)
130 gras_msg_send(buddy, "amok_pm_kill", NULL);
133 /** \brief kill syncronously a buddy (do not return before its death). Note that it is not removed from any group it may belong to. */
134 void amok_pm_kill_sync(gras_socket_t buddy)
136 gras_msg_rpccall(buddy, 30, "amok_pm_killrpc", NULL, NULL);
140 /** \brief create a new peermanagement group located on local peer
142 * The dynar elements are of type xbt_peer_t
144 xbt_dynar_t amok_pm_group_new(const char *group_name)
147 xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
148 xbt_peer_free_voidp);
150 xbt_assert0(amok_pm_moddata_id != -1, "Run amok_pm_init first!");
151 g = gras_moddata_by_id(amok_pm_moddata_id);
153 DEBUG1("retrieved groups=%p", g->groups);
155 xbt_dict_set(g->groups, group_name, res, NULL); /*FIXME: leaking xbt_dynar_free_voidp); */
156 VERB1("Group %s created", group_name);
161 /** \brief retrieve all members of the given remote group */
162 xbt_dynar_t amok_pm_group_get(gras_socket_t master, const char *group_name)
166 gras_msg_rpccall(master, 30, "amok_pm_get", &group_name, &res);
170 /** \brief add current peer to the given remote group
172 * Returns the rank of the process in the group.
174 int amok_pm_group_join(gras_socket_t master, const char *group_name)
177 VERB3("Join group '%s' on %s:%d",
178 group_name, gras_socket_peer_name(master),
179 gras_socket_peer_port(master));
180 gras_msg_rpccall(master, 30, "amok_pm_join", &group_name, &rank);
181 VERB4("Joined group '%s' on %s:%d. Got rank %d",
182 group_name, gras_socket_peer_name(master),
183 gras_socket_peer_port(master),
188 /** \brief remove current peer from the given remote group if found
190 * If not found, call is ignored
192 void amok_pm_group_leave(gras_socket_t master, const char *group_name)
194 gras_msg_rpccall(master, 30, "amok_pm_leave", &group_name, NULL);
195 VERB3("Leaved group '%s' on %s:%d",
196 group_name, gras_socket_peer_name(master),
197 gras_socket_peer_port(master));
200 /** \brief stops all members of the given local group */
201 void amok_pm_group_shutdown(const char *group_name)
203 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
204 xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
209 xbt_dynar_foreach(group, cpt, peer_it) {
210 amok_pm_kill_hp(peer_it->name, peer_it->port);
213 xbt_dynar_free(&group);
214 xbt_dict_remove(g->groups, group_name);
217 /** \brief stops all members of the given remote group */
218 void amok_pm_group_shutdown_remote(gras_socket_t master,
219 const char *group_name)
221 gras_msg_rpccall(master, 30, "amok_pm_shutdown", &group_name, NULL);
227 * * Module management functions
233 static void _amok_pm_init(void)
235 /* no world-wide globals */
236 /* Datatype and message declarations */
237 gras_datadesc_type_t pm_group_type =
238 gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"),
239 xbt_peer_free_voidp);
241 gras_msgtype_declare("amok_pm_kill", NULL);
242 gras_msgtype_declare_rpc("amok_pm_killrpc", NULL, NULL);
244 gras_msgtype_declare_rpc("amok_pm_get",
245 gras_datadesc_by_name("string"), pm_group_type);
246 gras_msgtype_declare_rpc("amok_pm_join", gras_datadesc_by_name("string"), gras_datadesc_by_name("int"));
247 gras_msgtype_declare_rpc("amok_pm_leave",
248 gras_datadesc_by_name("string"), NULL);
250 gras_msgtype_declare_rpc("amok_pm_shutdown",
251 gras_datadesc_by_name("string"), NULL);
254 static void _amok_pm_join(void *p)
256 /* moddata management */
257 amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
262 mod->groups = xbt_dict_new();
265 gras_cb_register("amok_pm_kill", &amok_pm_cb_kill);
266 gras_cb_register("amok_pm_killrpc", &amok_pm_cb_killrpc);
268 gras_cb_register("amok_pm_get", &amok_pm_cb_get);
269 gras_cb_register("amok_pm_join", &amok_pm_cb_join);
270 gras_cb_register("amok_pm_leave", &amok_pm_cb_leave);
271 gras_cb_register("amok_pm_shutdown", &amok_pm_cb_shutdown);
274 static void _amok_pm_exit(void)
276 /* no world-wide globals */
279 static void _amok_pm_leave(void *p)
282 amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
285 xbt_dict_free(&mod->groups);
288 gras_cb_unregister("amok_pm_kill", &amok_pm_cb_kill);
289 gras_cb_unregister("amok_pm_killrpc", &amok_pm_cb_killrpc);
291 gras_cb_unregister("amok_pm_get", &amok_pm_cb_get);
292 gras_cb_unregister("amok_pm_join", &amok_pm_cb_join);
293 gras_cb_unregister("amok_pm_leave", &amok_pm_cb_leave);
294 gras_cb_unregister("amok_pm_shutdown", &amok_pm_cb_shutdown);
297 void amok_pm_modulecreate()
299 gras_module_add("amok_pm", sizeof(s_amok_pm_moddata_t), &amok_pm_moddata_id,
300 _amok_pm_init, _amok_pm_exit, _amok_pm_join,
308 * * Old module functions (kept for compatibility)
311 /** \brief Initialize the peer management module. Every process must run it before use */
314 gras_module_join("amok_pm");
317 /** \brief Finalize the peer management module. Every process should run it after use */
320 gras_module_leave("amok_pm");