Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add some missing copyright headers
[simgrid.git] / src / amok / PeerManagement / peermanagement.c
1 /* $Id$ */
2
3 /* amok peer management - servers main loop and remote peer stopping        */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/sysdep.h"
11 #include "xbt/peer.h"
12 #include "amok/peermanagement.h"
13
14 #include "amok/amok_modinter.h" /* prototype of my module declaration */
15 #include "gras/module.h"        /* module mecanism */
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm, amok, "peer management");
18
19
20 /* data management */
21 int amok_pm_moddata_id = -1;
22 typedef struct {
23   int done;
24   xbt_dict_t groups;
25 } s_amok_pm_moddata_t, *amok_pm_moddata_t;
26
27 /* Message callbacks */
28 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
29 {
30
31   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
32   g->done = 1;
33   return 0;
34 }
35
36 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx, void *payload_data)
37 {
38
39   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
40   g->done = 1;
41   gras_msg_rpcreturn(30, ctx, NULL);
42   return 0;
43 }
44
45 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload)
46 {
47   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
48   char *name = *(void **) payload;
49   xbt_dynar_t res = xbt_dict_get(g->groups, name);
50
51   gras_msg_rpcreturn(30, ctx, &res);
52   return 0;
53 }
54
55 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload)
56 {
57   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
58   char* group_name = *(char* *) payload;
59   xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
60   int rank;
61
62   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
63   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
64                                  gras_socket_peer_port(exp));
65
66   rank = xbt_dynar_length(group);
67   xbt_dynar_push(group, &dude);
68   VERB3("Contacted by %s:%d. Give it rank #%d", dude->name, dude->port,rank);
69
70   gras_msg_rpcreturn(10, ctx, &rank);
71   free(group_name);
72   return 0;
73 }
74
75 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload)
76 {
77   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
78   char *name = *(void **) payload;
79   xbt_dynar_t group = xbt_dict_get(g->groups, name);
80
81   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
82   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
83                                  gras_socket_peer_port(exp));
84
85   unsigned int cpt;
86   xbt_peer_t peer_it;
87
88   xbt_dynar_foreach(group, cpt, peer_it) {
89     if (!strcmp(peer_it->name, dude->name) && peer_it->port == dude->port) {
90       xbt_dynar_cursor_rm(group, &cpt);
91       goto end;
92     }
93   }
94   WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
95         dude->name, dude->port, name);
96
97 end:
98   gras_msg_rpcreturn(30, ctx, NULL);
99   return 0;
100 }
101
102 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload)
103 {
104   char *name = *(void **) payload;
105   amok_pm_group_shutdown(name);
106
107   gras_msg_rpcreturn(30, ctx, NULL);
108   return 0;
109 }
110
111 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
112 void amok_pm_mainloop(double timeOut)
113 {
114   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
115
116   while (!g->done) {
117     gras_msg_handle(timeOut);
118   }
119 }
120
121 /** \brief kill a buddy identified by its peername and port. Note that it is not removed from any group it may belong to. */
122 void amok_pm_kill_hp(char *name, int port)
123 {
124   gras_socket_t sock = gras_socket_client(name, port);
125   amok_pm_kill(sock);
126   gras_socket_close(sock);
127 }
128
129 /** \brief kill a buddy to which we have a socket already. Note that it is not removed from any group it may belong to. */
130 void amok_pm_kill(gras_socket_t buddy)
131 {
132   gras_msg_send(buddy, "amok_pm_kill", NULL);
133 }
134
135 /** \brief kill syncronously a buddy (do not return before its death). Note that it is not removed from any group it may belong to. */
136 void amok_pm_kill_sync(gras_socket_t buddy)
137 {
138   gras_msg_rpccall(buddy, 30, "amok_pm_killrpc", NULL, NULL);
139 }
140
141
142 /** \brief create a new peermanagement group located on local peer 
143  *
144  * The dynar elements are of type xbt_peer_t
145  */
146 xbt_dynar_t amok_pm_group_new(const char *group_name)
147 {
148   amok_pm_moddata_t g;
149   xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
150                                   xbt_peer_free_voidp);
151
152   xbt_assert0(amok_pm_moddata_id != -1, "Run amok_pm_init first!");
153   g = gras_moddata_by_id(amok_pm_moddata_id);
154
155   DEBUG1("retrieved groups=%p", g->groups);
156
157   xbt_dict_set(g->groups, group_name, res, NULL);       /*FIXME: leaking xbt_dynar_free_voidp); */
158   VERB1("Group %s created", group_name);
159
160   return res;
161 }
162
163 /** \brief retrieve all members of the given remote group */
164 xbt_dynar_t amok_pm_group_get(gras_socket_t master, const char *group_name)
165 {
166   xbt_dynar_t res;
167
168   gras_msg_rpccall(master, 30, "amok_pm_get", &group_name, &res);
169   return res;
170 }
171
172 /** \brief add current peer to the given remote group 
173  *
174  * Returns the rank of the process in the group.
175  */
176 int amok_pm_group_join(gras_socket_t master, const char *group_name)
177 {
178   int rank;
179   VERB3("Join group '%s' on %s:%d",
180         group_name, gras_socket_peer_name(master),
181         gras_socket_peer_port(master));
182   gras_msg_rpccall(master, 30, "amok_pm_join", &group_name, &rank);
183   VERB4("Joined group '%s' on %s:%d. Got rank %d",
184         group_name, gras_socket_peer_name(master),
185         gras_socket_peer_port(master),
186         rank);
187   return rank;
188 }
189
190 /** \brief remove current peer from the given remote group if found
191  *
192  * If not found, call is ignored 
193  */
194 void amok_pm_group_leave(gras_socket_t master, const char *group_name)
195 {
196   gras_msg_rpccall(master, 30, "amok_pm_leave", &group_name, NULL);
197   VERB3("Leaved group '%s' on %s:%d",
198         group_name, gras_socket_peer_name(master),
199         gras_socket_peer_port(master));
200 }
201
202 /** \brief stops all members of the given local group */
203 void amok_pm_group_shutdown(const char *group_name)
204 {
205   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
206   xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
207
208   unsigned int cpt;
209   xbt_peer_t peer_it;
210
211   xbt_dynar_foreach(group, cpt, peer_it) {
212     amok_pm_kill_hp(peer_it->name, peer_it->port);
213   }
214
215   xbt_dynar_free(&group);
216   xbt_dict_remove(g->groups, group_name);
217 }
218
219 /** \brief stops all members of the given remote group */
220 void amok_pm_group_shutdown_remote(gras_socket_t master,
221                                    const char *group_name)
222 {
223   gras_msg_rpccall(master, 30, "amok_pm_shutdown", &group_name, NULL);
224 }
225
226
227 /* *
228  * *
229  * * Module management functions
230  * *
231  * */
232
233
234
235 static void _amok_pm_init(void)
236 {
237   /* no world-wide globals */
238   /* Datatype and message declarations */
239   gras_datadesc_type_t pm_group_type =
240     gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"),
241                         xbt_peer_free_voidp);
242
243   gras_msgtype_declare("amok_pm_kill", NULL);
244   gras_msgtype_declare_rpc("amok_pm_killrpc", NULL, NULL);
245
246   gras_msgtype_declare_rpc("amok_pm_get",
247                            gras_datadesc_by_name("string"), pm_group_type);
248   gras_msgtype_declare_rpc("amok_pm_join", gras_datadesc_by_name("string"), gras_datadesc_by_name("int"));
249   gras_msgtype_declare_rpc("amok_pm_leave",
250                            gras_datadesc_by_name("string"), NULL);
251
252   gras_msgtype_declare_rpc("amok_pm_shutdown",
253                            gras_datadesc_by_name("string"), NULL);
254 }
255
256 static void _amok_pm_join(void *p)
257 {
258   /* moddata management */
259   amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
260
261   mod->groups = NULL;
262
263   mod->done = 0;
264   mod->groups = xbt_dict_new();
265
266   /* callbacks */
267   gras_cb_register("amok_pm_kill", &amok_pm_cb_kill);
268   gras_cb_register("amok_pm_killrpc", &amok_pm_cb_killrpc);
269
270   gras_cb_register("amok_pm_get", &amok_pm_cb_get);
271   gras_cb_register("amok_pm_join", &amok_pm_cb_join);
272   gras_cb_register("amok_pm_leave", &amok_pm_cb_leave);
273   gras_cb_register("amok_pm_shutdown", &amok_pm_cb_shutdown);
274 }
275
276 static void _amok_pm_exit(void)
277 {
278   /* no world-wide globals */
279 }
280
281 static void _amok_pm_leave(void *p)
282 {
283   /* moddata */
284   amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
285
286   if (mod->groups)
287     xbt_dict_free(&mod->groups);
288
289   /* callbacks */
290   gras_cb_unregister("amok_pm_kill", &amok_pm_cb_kill);
291   gras_cb_unregister("amok_pm_killrpc", &amok_pm_cb_killrpc);
292
293   gras_cb_unregister("amok_pm_get", &amok_pm_cb_get);
294   gras_cb_unregister("amok_pm_join", &amok_pm_cb_join);
295   gras_cb_unregister("amok_pm_leave", &amok_pm_cb_leave);
296   gras_cb_unregister("amok_pm_shutdown", &amok_pm_cb_shutdown);
297 }
298
299 void amok_pm_modulecreate()
300 {
301   gras_module_add("amok_pm", sizeof(s_amok_pm_moddata_t), &amok_pm_moddata_id,
302                   _amok_pm_init, _amok_pm_exit, _amok_pm_join,
303                   _amok_pm_leave);
304 }
305
306
307
308 /* *
309  * *
310  * * Old module functions (kept for compatibility)
311  * *
312  * */
313 /** \brief Initialize the peer management module. Every process must run it before use */
314 void amok_pm_init()
315 {
316   gras_module_join("amok_pm");
317 }
318
319 /** \brief Finalize the peer management module. Every process should run it after use */
320 void amok_pm_exit()
321 {
322   gras_module_leave("amok_pm");
323 }