3 /* amok host management - servers main loop and remote host stopping */
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/sysdep.h"
12 #include "amok/hostmanagement.h"
13 #include "gras/Virtu/virtu_interface.h" /* libdata */
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_hm,amok,"Host management");
18 /* libdata management */
19 static int amok_hm_libdata_id=-1;
24 unsigned int name_len;
29 } s_amok_hm_libdata_t, *amok_hm_libdata_t;
31 static void *amok_hm_libdata_new() {
32 amok_hm_libdata_t res=xbt_new(s_amok_hm_libdata_t,1);
33 res->name=xbt_strdup("amok_hm");
36 res->groups = xbt_dict_new();
39 static void amok_hm_libdata_free(void *d) {
40 amok_hm_libdata_t data=(amok_hm_libdata_t)d;
42 xbt_dict_free(&data->groups);
46 /* Message callbacks */
47 static int amok_hm_cb_kill(gras_msg_cb_ctx_t ctx,
50 amok_hm_libdata_t g=gras_libdata_by_id(amok_hm_libdata_id);
54 static int amok_hm_cb_killrpc(gras_msg_cb_ctx_t ctx,
57 amok_hm_libdata_t g=gras_libdata_by_id(amok_hm_libdata_id);
59 gras_msg_rpcreturn(30,ctx,NULL);
63 static int amok_hm_cb_get(gras_msg_cb_ctx_t ctx, void *payload) {
64 amok_hm_libdata_t g=gras_libdata_by_id(amok_hm_libdata_id);
65 char *name = *(void**)payload;
66 xbt_dynar_t res = xbt_dict_get(g->groups, name);
68 gras_msg_rpcreturn(30, ctx, &res);
71 static int amok_hm_cb_join(gras_msg_cb_ctx_t ctx, void *payload) {
72 amok_hm_libdata_t g=gras_libdata_by_id(amok_hm_libdata_id);
73 char *name = *(void**)payload;
74 xbt_dynar_t group = xbt_dict_get(g->groups, name);
76 gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
77 xbt_host_t dude = xbt_host_new(gras_socket_peer_name(exp),
78 gras_socket_peer_port(exp));
80 VERB2("Contacted by %s:%d",dude->name,dude->port);
81 xbt_dynar_push(group,&dude);
83 gras_msg_rpcreturn(10, ctx, NULL);
87 static int amok_hm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload) {
88 amok_hm_libdata_t g=gras_libdata_by_id(amok_hm_libdata_id);
89 char *name = *(void**)payload;
90 xbt_dynar_t group = xbt_dict_get(g->groups, name);
92 gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
93 xbt_host_t dude = xbt_host_new(gras_socket_peer_name(exp),
94 gras_socket_peer_port(exp));
99 xbt_dynar_foreach(group, cpt, host_it) {
100 if (!strcmp(host_it->name, dude->name) &&
101 host_it->port == dude->port) {
102 xbt_dynar_cursor_rm (group,&cpt);
106 WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
107 dude->name,dude->port, name);
110 gras_msg_rpcreturn(30, ctx, NULL);
114 static int amok_hm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload) {
115 char *name = *(void**)payload;
116 amok_hm_group_shutdown(name);
118 gras_msg_rpcreturn(30, ctx, NULL);
123 /* Initialization stuff */
124 static short amok_hm_used = 0;
126 /** \brief Initialize the host management module. Every process must run it before use */
127 void amok_hm_init() {
129 if (! amok_hm_used) {
134 /* module data on each process */
135 amok_hm_libdata_id = gras_procdata_add("amok_hm",
137 amok_hm_libdata_free);
139 /* Datatype and message declarations */
140 gras_msgtype_declare("amok_hm_kill",NULL);
141 gras_msgtype_declare_rpc("amok_hm_killrpc",NULL,NULL);
143 gras_msgtype_declare_rpc("amok_hm_get",
144 gras_datadesc_by_name("string"),
145 gras_datadesc_by_name("xbt_dynar_t"));
146 gras_msgtype_declare_rpc("amok_hm_join",
147 gras_datadesc_by_name("string"),
149 gras_msgtype_declare_rpc("amok_hm_leave",
150 gras_datadesc_by_name("string"),
153 gras_msgtype_declare_rpc("amok_hm_shutdown",
154 gras_datadesc_by_name("string"),
160 gras_cb_register(gras_msgtype_by_name("amok_hm_kill"),
162 gras_cb_register(gras_msgtype_by_name("amok_hm_killrpc"),
163 &amok_hm_cb_killrpc);
165 gras_cb_register(gras_msgtype_by_name("amok_hm_get"),
167 gras_cb_register(gras_msgtype_by_name("amok_hm_join"),
169 gras_cb_register(gras_msgtype_by_name("amok_hm_leave"),
171 gras_cb_register(gras_msgtype_by_name("amok_hm_shutdown"),
172 &amok_hm_cb_shutdown);
175 /** \brief Finalize the host management module. Every process should run it after use */
176 void amok_hm_exit() {
181 gras_cb_unregister(gras_msgtype_by_name("amok_hm_kill"),
183 gras_cb_unregister(gras_msgtype_by_name("amok_hm_killrpc"),
184 &amok_hm_cb_killrpc);
186 gras_cb_unregister(gras_msgtype_by_name("amok_hm_get"),
188 gras_cb_unregister(gras_msgtype_by_name("amok_hm_join"),
190 gras_cb_unregister(gras_msgtype_by_name("amok_hm_leave"),
193 gras_cb_unregister(gras_msgtype_by_name("amok_hm_shutdown"),
194 &amok_hm_cb_shutdown);
198 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
199 void amok_hm_mainloop(double timeOut) {
200 amok_hm_libdata_t g=gras_libdata_by_id(amok_hm_libdata_id);
203 gras_msg_handle(timeOut);
207 /** \brief kill a buddy identified by its hostname and port */
208 void amok_hm_kill_hp(char *name,int port) {
209 gras_socket_t sock=gras_socket_client(name,port);
211 gras_socket_close(sock);
214 /** \brief kill a buddy to which we have a socket already */
215 void amok_hm_kill(gras_socket_t buddy) {
216 gras_msg_send(buddy,gras_msgtype_by_name("amok_hm_kill"),NULL);
219 /** \brief kill syncronously a buddy (do not return before its death) */
220 void amok_hm_kill_sync(gras_socket_t buddy) {
221 gras_msg_rpccall(buddy,30,gras_msgtype_by_name("amok_hm_killrpc"),NULL,NULL);
225 /** \brief create a new hostmanagement group located on local host
227 * The dynar elements are of type xbt_host_t
229 xbt_dynar_t amok_hm_group_new(const char *group_name) {
231 xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_host_t),
232 xbt_host_free_voidp);
234 xbt_assert0(amok_hm_libdata_id != -1,"Run amok_hm_init first!");
235 g=gras_libdata_by_id(amok_hm_libdata_id);
237 xbt_dict_set(g->groups,group_name,res,NULL); /*FIXME: leaking xbt_dynar_free_voidp);*/
238 VERB1("Group %s created",group_name);
242 /** \brief retrieve all members of the given remote group */
243 xbt_dynar_t amok_hm_group_get(gras_socket_t master, const char *group_name) {
246 gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_hm_get"),
251 /** \brief add current host to the given remote group */
252 void amok_hm_group_join(gras_socket_t master, const char *group_name) {
253 VERB3("Join group '%s' on %s:%d",
254 group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
255 gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_hm_join"),
257 VERB3("Joined group '%s' on %s:%d",
258 group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
260 /** \brief remove current host from the given remote group if found
262 * If not found, call is ignored
264 void amok_hm_group_leave(gras_socket_t master, const char *group_name) {
265 gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_hm_leave"),
267 VERB3("Leaved group '%s' on %s:%d",
268 group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
271 /** \brief stops all members of the given local group */
272 void amok_hm_group_shutdown(const char *group_name) {
273 amok_hm_libdata_t g=gras_libdata_by_id(amok_hm_libdata_id);
274 xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
279 xbt_dynar_foreach(group, cpt, host_it) {
280 amok_hm_kill_hp(host_it->name, host_it->port);
283 xbt_dynar_free(&group);
284 xbt_dict_remove(g->groups,group_name);
286 /** \brief stops all members of the given remote group */
287 void amok_hm_group_shutdown_remote(gras_socket_t master, const char *group_name){
288 gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_hm_shutdown"),