3 /* amok peer management - servers main loop and remote peer stopping */
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/sysdep.h"
12 #include "amok/peermanagement.h"
14 #include "amok/amok_modinter.h" /* prototype of my module declaration */
15 #include "gras/module.h" /* module mecanism */
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm,amok,"peer management");
21 int amok_pm_moddata_id=-1;
25 } s_amok_pm_moddata_t, *amok_pm_moddata_t;
28 /* Message callbacks */
29 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx,
32 amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
36 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx,
39 amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
41 gras_msg_rpcreturn(30,ctx,NULL);
45 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload) {
46 amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
47 char *name = *(void**)payload;
48 xbt_dynar_t res = xbt_dict_get(g->groups, name);
50 gras_msg_rpcreturn(30, ctx, &res);
53 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload) {
54 amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
55 char *name = *(void**)payload;
56 xbt_dynar_t group = xbt_dict_get(g->groups, name);
58 gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
59 xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
60 gras_socket_peer_port(exp));
62 VERB2("Contacted by %s:%d",dude->name,dude->port);
63 xbt_dynar_push(group,&dude);
65 gras_msg_rpcreturn(10, ctx, NULL);
69 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload) {
70 amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
71 char *name = *(void**)payload;
72 xbt_dynar_t group = xbt_dict_get(g->groups, name);
74 gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
75 xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
76 gras_socket_peer_port(exp));
81 xbt_dynar_foreach(group, cpt, peer_it) {
82 if (!strcmp(peer_it->name, dude->name) &&
83 peer_it->port == dude->port) {
84 xbt_dynar_cursor_rm (group,&cpt);
88 WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
89 dude->name,dude->port, name);
92 gras_msg_rpcreturn(30, ctx, NULL);
96 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload) {
97 char *name = *(void**)payload;
98 amok_pm_group_shutdown(name);
100 gras_msg_rpcreturn(30, ctx, NULL);
104 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
105 void amok_pm_mainloop(double timeOut) {
106 amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
109 gras_msg_handle(timeOut);
113 /** \brief kill a buddy identified by its peername and port */
114 void amok_pm_kill_hp(char *name,int port) {
115 gras_socket_t sock=gras_socket_client(name,port);
117 gras_socket_close(sock);
120 /** \brief kill a buddy to which we have a socket already */
121 void amok_pm_kill(gras_socket_t buddy) {
122 gras_msg_send(buddy,gras_msgtype_by_name("amok_pm_kill"),NULL);
125 /** \brief kill syncronously a buddy (do not return before its death) */
126 void amok_pm_kill_sync(gras_socket_t buddy) {
127 gras_msg_rpccall(buddy,30,gras_msgtype_by_name("amok_pm_killrpc"),NULL,NULL);
131 /** \brief create a new peermanagement group located on local peer
133 * The dynar elements are of type xbt_peer_t
135 xbt_dynar_t amok_pm_group_new(const char *group_name) {
137 xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
138 xbt_peer_free_voidp);
140 xbt_assert0(amok_pm_moddata_id != -1,"Run amok_pm_init first!");
141 g=gras_moddata_by_id(amok_pm_moddata_id);
143 DEBUG1("retrieved groups=%p",g->groups);
145 xbt_dict_set(g->groups,group_name,res,NULL); /*FIXME: leaking xbt_dynar_free_voidp);*/
146 VERB1("Group %s created",group_name);
150 /** \brief retrieve all members of the given remote group */
151 xbt_dynar_t amok_pm_group_get(gras_socket_t master, const char *group_name) {
154 gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_get"),
159 /** \brief add current peer to the given remote group */
160 void amok_pm_group_join(gras_socket_t master, const char *group_name) {
161 VERB3("Join group '%s' on %s:%d",
162 group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
163 gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_join"),
165 VERB3("Joined group '%s' on %s:%d",
166 group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
168 /** \brief remove current peer from the given remote group if found
170 * If not found, call is ignored
172 void amok_pm_group_leave(gras_socket_t master, const char *group_name) {
173 gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_leave"),
175 VERB3("Leaved group '%s' on %s:%d",
176 group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
179 /** \brief stops all members of the given local group */
180 void amok_pm_group_shutdown(const char *group_name) {
181 amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
182 xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
187 xbt_dynar_foreach(group, cpt, peer_it) {
188 amok_pm_kill_hp(peer_it->name, peer_it->port);
191 xbt_dynar_free(&group);
192 xbt_dict_remove(g->groups,group_name);
194 /** \brief stops all members of the given remote group */
195 void amok_pm_group_shutdown_remote(gras_socket_t master, const char *group_name){
196 gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_shutdown"),
203 * * Module management functions
209 static void _amok_pm_init(void) {
210 /* no world-wide globals */
211 /* Datatype and message declarations */
212 gras_datadesc_type_t pm_group_type = gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"), xbt_peer_free_voidp);
214 gras_msgtype_declare("amok_pm_kill",NULL);
215 gras_msgtype_declare_rpc("amok_pm_killrpc",NULL,NULL);
217 gras_msgtype_declare_rpc("amok_pm_get",
218 gras_datadesc_by_name("string"),
220 gras_msgtype_declare_rpc("amok_pm_join",
221 gras_datadesc_by_name("string"),
223 gras_msgtype_declare_rpc("amok_pm_leave",
224 gras_datadesc_by_name("string"),
227 gras_msgtype_declare_rpc("amok_pm_shutdown",
228 gras_datadesc_by_name("string"),
232 static void _amok_pm_join(void *p) {
233 /* moddata management */
234 amok_pm_moddata_t mod = (amok_pm_moddata_t)p;
237 mod->groups = xbt_dict_new();
240 gras_cb_register(gras_msgtype_by_name("amok_pm_kill"),
242 gras_cb_register(gras_msgtype_by_name("amok_pm_killrpc"),
243 &amok_pm_cb_killrpc);
245 gras_cb_register(gras_msgtype_by_name("amok_pm_get"),
247 gras_cb_register(gras_msgtype_by_name("amok_pm_join"),
249 gras_cb_register(gras_msgtype_by_name("amok_pm_leave"),
251 gras_cb_register(gras_msgtype_by_name("amok_pm_shutdown"),
252 &amok_pm_cb_shutdown);
254 static void _amok_pm_exit(void) {
255 /* no world-wide globals */
257 static void _amok_pm_leave(void *p) {
259 amok_pm_moddata_t mod = (amok_pm_moddata_t)p;
261 xbt_dict_free(&mod->groups);
264 gras_cb_unregister(gras_msgtype_by_name("amok_pm_kill"),
266 gras_cb_unregister(gras_msgtype_by_name("amok_pm_killrpc"),
267 &amok_pm_cb_killrpc);
269 gras_cb_unregister(gras_msgtype_by_name("amok_pm_get"),
271 gras_cb_unregister(gras_msgtype_by_name("amok_pm_join"),
273 gras_cb_unregister(gras_msgtype_by_name("amok_pm_leave"),
276 gras_cb_unregister(gras_msgtype_by_name("amok_pm_shutdown"),
277 &amok_pm_cb_shutdown);
280 void amok_pm_modulecreate() {
281 gras_module_add("amok_pm", sizeof(s_amok_pm_moddata_t), &amok_pm_moddata_id,
282 &_amok_pm_init,&_amok_pm_exit,&_amok_pm_join,&_amok_pm_leave);
289 * * Old module functions (kept for compatibility)
292 /** \brief Initialize the peer management module. Every process must run it before use */
293 void amok_pm_init() {
294 gras_module_join("amok_pm");
297 /** \brief Finalize the peer management module. Every process should run it after use */
298 void amok_pm_exit() {
299 gras_module_leave("amok_pm");