Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill old $Id$ command dating from CVS
[simgrid.git] / src / amok / PeerManagement / peermanagement.c
1 /* amok peer management - servers main loop and remote peer stopping        */
2
3 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "xbt/sysdep.h"
9 #include "xbt/peer.h"
10 #include "amok/peermanagement.h"
11
12 #include "amok/amok_modinter.h" /* prototype of my module declaration */
13 #include "gras/module.h"        /* module mecanism */
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm, amok, "peer management");
16
17
18 /* data management */
19 int amok_pm_moddata_id = -1;
20 typedef struct {
21   int done;
22   xbt_dict_t groups;
23 } s_amok_pm_moddata_t, *amok_pm_moddata_t;
24
25 /* Message callbacks */
26 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
27 {
28
29   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
30   g->done = 1;
31   return 0;
32 }
33
34 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx, void *payload_data)
35 {
36
37   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
38   g->done = 1;
39   gras_msg_rpcreturn(30, ctx, NULL);
40   return 0;
41 }
42
43 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload)
44 {
45   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
46   char *name = *(void **) payload;
47   xbt_dynar_t res = xbt_dict_get(g->groups, name);
48
49   gras_msg_rpcreturn(30, ctx, &res);
50   return 0;
51 }
52
53 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload)
54 {
55   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
56   char* group_name = *(char* *) payload;
57   xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
58   int rank;
59
60   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
61   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
62                                  gras_socket_peer_port(exp));
63
64   rank = xbt_dynar_length(group);
65   xbt_dynar_push(group, &dude);
66   VERB3("Contacted by %s:%d. Give it rank #%d", dude->name, dude->port,rank);
67
68   gras_msg_rpcreturn(10, ctx, &rank);
69   free(group_name);
70   return 0;
71 }
72
73 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload)
74 {
75   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
76   char *name = *(void **) payload;
77   xbt_dynar_t group = xbt_dict_get(g->groups, name);
78
79   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
80   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
81                                  gras_socket_peer_port(exp));
82
83   unsigned int cpt;
84   xbt_peer_t peer_it;
85
86   xbt_dynar_foreach(group, cpt, peer_it) {
87     if (!strcmp(peer_it->name, dude->name) && peer_it->port == dude->port) {
88       xbt_dynar_cursor_rm(group, &cpt);
89       goto end;
90     }
91   }
92   WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
93         dude->name, dude->port, name);
94
95 end:
96   gras_msg_rpcreturn(30, ctx, NULL);
97   return 0;
98 }
99
100 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload)
101 {
102   char *name = *(void **) payload;
103   amok_pm_group_shutdown(name);
104
105   gras_msg_rpcreturn(30, ctx, NULL);
106   return 0;
107 }
108
109 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
110 void amok_pm_mainloop(double timeOut)
111 {
112   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
113
114   while (!g->done) {
115     gras_msg_handle(timeOut);
116   }
117 }
118
119 /** \brief kill a buddy identified by its peername and port. Note that it is not removed from any group it may belong to. */
120 void amok_pm_kill_hp(char *name, int port)
121 {
122   gras_socket_t sock = gras_socket_client(name, port);
123   amok_pm_kill(sock);
124   gras_socket_close(sock);
125 }
126
127 /** \brief kill a buddy to which we have a socket already. Note that it is not removed from any group it may belong to. */
128 void amok_pm_kill(gras_socket_t buddy)
129 {
130   gras_msg_send(buddy, "amok_pm_kill", NULL);
131 }
132
133 /** \brief kill syncronously a buddy (do not return before its death). Note that it is not removed from any group it may belong to. */
134 void amok_pm_kill_sync(gras_socket_t buddy)
135 {
136   gras_msg_rpccall(buddy, 30, "amok_pm_killrpc", NULL, NULL);
137 }
138
139
140 /** \brief create a new peermanagement group located on local peer 
141  *
142  * The dynar elements are of type xbt_peer_t
143  */
144 xbt_dynar_t amok_pm_group_new(const char *group_name)
145 {
146   amok_pm_moddata_t g;
147   xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
148                                   xbt_peer_free_voidp);
149
150   xbt_assert0(amok_pm_moddata_id != -1, "Run amok_pm_init first!");
151   g = gras_moddata_by_id(amok_pm_moddata_id);
152
153   DEBUG1("retrieved groups=%p", g->groups);
154
155   xbt_dict_set(g->groups, group_name, res, NULL);       /*FIXME: leaking xbt_dynar_free_voidp); */
156   VERB1("Group %s created", group_name);
157
158   return res;
159 }
160
161 /** \brief retrieve all members of the given remote group */
162 xbt_dynar_t amok_pm_group_get(gras_socket_t master, const char *group_name)
163 {
164   xbt_dynar_t res;
165
166   gras_msg_rpccall(master, 30, "amok_pm_get", &group_name, &res);
167   return res;
168 }
169
170 /** \brief add current peer to the given remote group 
171  *
172  * Returns the rank of the process in the group.
173  */
174 int amok_pm_group_join(gras_socket_t master, const char *group_name)
175 {
176   int rank;
177   VERB3("Join group '%s' on %s:%d",
178         group_name, gras_socket_peer_name(master),
179         gras_socket_peer_port(master));
180   gras_msg_rpccall(master, 30, "amok_pm_join", &group_name, &rank);
181   VERB4("Joined group '%s' on %s:%d. Got rank %d",
182         group_name, gras_socket_peer_name(master),
183         gras_socket_peer_port(master),
184         rank);
185   return rank;
186 }
187
188 /** \brief remove current peer from the given remote group if found
189  *
190  * If not found, call is ignored 
191  */
192 void amok_pm_group_leave(gras_socket_t master, const char *group_name)
193 {
194   gras_msg_rpccall(master, 30, "amok_pm_leave", &group_name, NULL);
195   VERB3("Leaved group '%s' on %s:%d",
196         group_name, gras_socket_peer_name(master),
197         gras_socket_peer_port(master));
198 }
199
200 /** \brief stops all members of the given local group */
201 void amok_pm_group_shutdown(const char *group_name)
202 {
203   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
204   xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
205
206   unsigned int cpt;
207   xbt_peer_t peer_it;
208
209   xbt_dynar_foreach(group, cpt, peer_it) {
210     amok_pm_kill_hp(peer_it->name, peer_it->port);
211   }
212
213   xbt_dynar_free(&group);
214   xbt_dict_remove(g->groups, group_name);
215 }
216
217 /** \brief stops all members of the given remote group */
218 void amok_pm_group_shutdown_remote(gras_socket_t master,
219                                    const char *group_name)
220 {
221   gras_msg_rpccall(master, 30, "amok_pm_shutdown", &group_name, NULL);
222 }
223
224
225 /* *
226  * *
227  * * Module management functions
228  * *
229  * */
230
231
232
233 static void _amok_pm_init(void)
234 {
235   /* no world-wide globals */
236   /* Datatype and message declarations */
237   gras_datadesc_type_t pm_group_type =
238     gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"),
239                         xbt_peer_free_voidp);
240
241   gras_msgtype_declare("amok_pm_kill", NULL);
242   gras_msgtype_declare_rpc("amok_pm_killrpc", NULL, NULL);
243
244   gras_msgtype_declare_rpc("amok_pm_get",
245                            gras_datadesc_by_name("string"), pm_group_type);
246   gras_msgtype_declare_rpc("amok_pm_join", gras_datadesc_by_name("string"), gras_datadesc_by_name("int"));
247   gras_msgtype_declare_rpc("amok_pm_leave",
248                            gras_datadesc_by_name("string"), NULL);
249
250   gras_msgtype_declare_rpc("amok_pm_shutdown",
251                            gras_datadesc_by_name("string"), NULL);
252 }
253
254 static void _amok_pm_join(void *p)
255 {
256   /* moddata management */
257   amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
258
259   mod->groups = NULL;
260
261   mod->done = 0;
262   mod->groups = xbt_dict_new();
263
264   /* callbacks */
265   gras_cb_register("amok_pm_kill", &amok_pm_cb_kill);
266   gras_cb_register("amok_pm_killrpc", &amok_pm_cb_killrpc);
267
268   gras_cb_register("amok_pm_get", &amok_pm_cb_get);
269   gras_cb_register("amok_pm_join", &amok_pm_cb_join);
270   gras_cb_register("amok_pm_leave", &amok_pm_cb_leave);
271   gras_cb_register("amok_pm_shutdown", &amok_pm_cb_shutdown);
272 }
273
274 static void _amok_pm_exit(void)
275 {
276   /* no world-wide globals */
277 }
278
279 static void _amok_pm_leave(void *p)
280 {
281   /* moddata */
282   amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
283
284   if (mod->groups)
285     xbt_dict_free(&mod->groups);
286
287   /* callbacks */
288   gras_cb_unregister("amok_pm_kill", &amok_pm_cb_kill);
289   gras_cb_unregister("amok_pm_killrpc", &amok_pm_cb_killrpc);
290
291   gras_cb_unregister("amok_pm_get", &amok_pm_cb_get);
292   gras_cb_unregister("amok_pm_join", &amok_pm_cb_join);
293   gras_cb_unregister("amok_pm_leave", &amok_pm_cb_leave);
294   gras_cb_unregister("amok_pm_shutdown", &amok_pm_cb_shutdown);
295 }
296
297 void amok_pm_modulecreate()
298 {
299   gras_module_add("amok_pm", sizeof(s_amok_pm_moddata_t), &amok_pm_moddata_id,
300                   _amok_pm_init, _amok_pm_exit, _amok_pm_join,
301                   _amok_pm_leave);
302 }
303
304
305
306 /* *
307  * *
308  * * Old module functions (kept for compatibility)
309  * *
310  * */
311 /** \brief Initialize the peer management module. Every process must run it before use */
312 void amok_pm_init()
313 {
314   gras_module_join("amok_pm");
315 }
316
317 /** \brief Finalize the peer management module. Every process should run it after use */
318 void amok_pm_exit()
319 {
320   gras_module_leave("amok_pm");
321 }