3 /* amok peer management - servers main loop and remote peer stopping */
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/sysdep.h"
12 #include "amok/peermanagement.h"
14 #include "amok/amok_modinter.h" /* prototype of my module declaration */
15 #include "gras/module.h" /* module mecanism */
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm, amok, "peer management");
21 int amok_pm_moddata_id = -1;
25 } s_amok_pm_moddata_t, *amok_pm_moddata_t;
27 /* Payload of join message */
31 } s_amok_pm_msg_join_t, *amok_pm_msg_join_t;
33 /* Message callbacks */
34 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
37 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
42 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx, void *payload_data)
45 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
47 gras_msg_rpcreturn(30, ctx, NULL);
51 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload)
53 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
54 char *name = *(void **) payload;
55 xbt_dynar_t res = xbt_dict_get(g->groups, name);
57 gras_msg_rpcreturn(30, ctx, &res);
61 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload)
63 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
64 amok_pm_msg_join_t msg = *(amok_pm_msg_join_t *) payload;
65 xbt_dynar_t group = xbt_dict_get(g->groups, msg->group);
67 gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
68 xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
69 gras_socket_peer_port(exp));
70 xbt_peer_t previous = NULL;
72 if (msg->rank >= 0 && xbt_dynar_length(group) >= msg->rank + 1)
73 xbt_dynar_get_cpy(group, msg->rank, &previous);
75 VERB3("Contacted by %s:%d for rank %d", dude->name, dude->port, msg->rank);
77 xbt_dynar_push(group, &dude);
81 "You wanted to get rank %d of group %s, but %s:%d is already there",
82 msg->rank, msg->group, previous->name, previous->port);
83 xbt_dynar_set(group, msg->rank, &dude);
86 gras_msg_rpcreturn(10, ctx, NULL);
92 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload)
94 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
95 char *name = *(void **) payload;
96 xbt_dynar_t group = xbt_dict_get(g->groups, name);
98 gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
99 xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
100 gras_socket_peer_port(exp));
105 xbt_dynar_foreach(group, cpt, peer_it) {
106 if (!strcmp(peer_it->name, dude->name) && peer_it->port == dude->port) {
107 xbt_dynar_cursor_rm(group, &cpt);
111 WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
112 dude->name, dude->port, name);
115 gras_msg_rpcreturn(30, ctx, NULL);
119 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload)
121 char *name = *(void **) payload;
122 amok_pm_group_shutdown(name);
124 gras_msg_rpcreturn(30, ctx, NULL);
128 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
129 void amok_pm_mainloop(double timeOut)
131 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
134 gras_msg_handle(timeOut);
138 /** \brief kill a buddy identified by its peername and port. Note that it is not removed from any group it may belong to. */
139 void amok_pm_kill_hp(char *name, int port)
141 gras_socket_t sock = gras_socket_client(name, port);
143 gras_socket_close(sock);
146 /** \brief kill a buddy to which we have a socket already. Note that it is not removed from any group it may belong to. */
147 void amok_pm_kill(gras_socket_t buddy)
149 gras_msg_send(buddy, "amok_pm_kill", NULL);
152 /** \brief kill syncronously a buddy (do not return before its death). Note that it is not removed from any group it may belong to. */
153 void amok_pm_kill_sync(gras_socket_t buddy)
155 gras_msg_rpccall(buddy, 30, "amok_pm_killrpc", NULL, NULL);
159 /** \brief create a new peermanagement group located on local peer
161 * The dynar elements are of type xbt_peer_t
163 xbt_dynar_t amok_pm_group_new(const char *group_name)
166 xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
167 xbt_peer_free_voidp);
169 xbt_assert0(amok_pm_moddata_id != -1, "Run amok_pm_init first!");
170 g = gras_moddata_by_id(amok_pm_moddata_id);
172 DEBUG1("retrieved groups=%p", g->groups);
174 xbt_dict_set(g->groups, group_name, res, NULL); /*FIXME: leaking xbt_dynar_free_voidp); */
175 VERB1("Group %s created", group_name);
180 /** \brief retrieve all members of the given remote group */
181 xbt_dynar_t amok_pm_group_get(gras_socket_t master, const char *group_name)
185 gras_msg_rpccall(master, 30, "amok_pm_get", &group_name, &res);
189 /** \brief add current peer to the given remote group
191 * You should provide rank only when you want to force the order of members
192 * (for example in strict test cases). Give -1 to leave the system choose it for you.
194 void amok_pm_group_join(gras_socket_t master, const char *group_name,
197 amok_pm_msg_join_t msg = xbt_new(s_amok_pm_msg_join_t, 1);
198 msg->group = (char *) group_name;
200 VERB4("Join group '%s' on %s:%d (at rank %d)",
201 group_name, gras_socket_peer_name(master),
202 gras_socket_peer_port(master), rank);
203 gras_msg_rpccall(master, 30, "amok_pm_join", &msg, NULL);
205 VERB3("Joined group '%s' on %s:%d",
206 group_name, gras_socket_peer_name(master),
207 gras_socket_peer_port(master));
210 /** \brief remove current peer from the given remote group if found
212 * If not found, call is ignored
214 void amok_pm_group_leave(gras_socket_t master, const char *group_name)
216 gras_msg_rpccall(master, 30, "amok_pm_leave", &group_name, NULL);
217 VERB3("Leaved group '%s' on %s:%d",
218 group_name, gras_socket_peer_name(master),
219 gras_socket_peer_port(master));
222 /** \brief stops all members of the given local group */
223 void amok_pm_group_shutdown(const char *group_name)
225 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
226 xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
231 xbt_dynar_foreach(group, cpt, peer_it) {
232 amok_pm_kill_hp(peer_it->name, peer_it->port);
235 xbt_dynar_free(&group);
236 xbt_dict_remove(g->groups, group_name);
239 /** \brief stops all members of the given remote group */
240 void amok_pm_group_shutdown_remote(gras_socket_t master,
241 const char *group_name)
243 gras_msg_rpccall(master, 30, "amok_pm_shutdown", &group_name, NULL);
249 * * Module management functions
255 static void _amok_pm_init(void)
257 /* no world-wide globals */
258 /* Datatype and message declarations */
259 gras_datadesc_type_t pm_group_type =
260 gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"),
261 xbt_peer_free_voidp);
263 gras_datadesc_type_t msg_join_t =
264 gras_datadesc_struct("s_amok_pm_moddata_t");
265 gras_datadesc_struct_append(msg_join_t, "group",
266 gras_datadesc_by_name("string"));
267 gras_datadesc_struct_append(msg_join_t, "rank",
268 gras_datadesc_by_name("int"));
269 gras_datadesc_struct_close(msg_join_t);
271 gras_datadesc_ref("amok_pm_moddata_t",
272 gras_datadesc_by_name("s_amok_pm_moddata_t"));
274 gras_msgtype_declare("amok_pm_kill", NULL);
275 gras_msgtype_declare_rpc("amok_pm_killrpc", NULL, NULL);
277 gras_msgtype_declare_rpc("amok_pm_get",
278 gras_datadesc_by_name("string"), pm_group_type);
279 gras_msgtype_declare_rpc("amok_pm_join", msg_join_t, NULL);
280 gras_msgtype_declare_rpc("amok_pm_leave",
281 gras_datadesc_by_name("string"), NULL);
283 gras_msgtype_declare_rpc("amok_pm_shutdown",
284 gras_datadesc_by_name("string"), NULL);
287 static void _amok_pm_join(void *p)
289 /* moddata management */
290 amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
295 mod->groups = xbt_dict_new();
298 gras_cb_register("amok_pm_kill", &amok_pm_cb_kill);
299 gras_cb_register("amok_pm_killrpc", &amok_pm_cb_killrpc);
301 gras_cb_register("amok_pm_get", &amok_pm_cb_get);
302 gras_cb_register("amok_pm_join", &amok_pm_cb_join);
303 gras_cb_register("amok_pm_leave", &amok_pm_cb_leave);
304 gras_cb_register("amok_pm_shutdown", &amok_pm_cb_shutdown);
307 static void _amok_pm_exit(void)
309 /* no world-wide globals */
312 static void _amok_pm_leave(void *p)
315 amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
318 xbt_dict_free(&mod->groups);
321 gras_cb_unregister("amok_pm_kill", &amok_pm_cb_kill);
322 gras_cb_unregister("amok_pm_killrpc", &amok_pm_cb_killrpc);
324 gras_cb_unregister("amok_pm_get", &amok_pm_cb_get);
325 gras_cb_unregister("amok_pm_join", &amok_pm_cb_join);
326 gras_cb_unregister("amok_pm_leave", &amok_pm_cb_leave);
327 gras_cb_unregister("amok_pm_shutdown", &amok_pm_cb_shutdown);
330 void amok_pm_modulecreate()
332 gras_module_add("amok_pm", sizeof(s_amok_pm_moddata_t), &amok_pm_moddata_id,
333 _amok_pm_init, _amok_pm_exit, _amok_pm_join,
341 * * Old module functions (kept for compatibility)
344 /** \brief Initialize the peer management module. Every process must run it before use */
347 gras_module_join("amok_pm");
350 /** \brief Finalize the peer management module. Every process should run it after use */
353 gras_module_leave("amok_pm");