Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
MPI_Reduce using waitany(). Buggy.
[simgrid.git] / src / amok / PeerManagement / peermanagement.c
1 /* $Id$ */
2
3 /* amok peer management - servers main loop and remote peer stopping        */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/sysdep.h"
11 #include "xbt/peer.h"
12 #include "amok/peermanagement.h"
13
14 #include "amok/amok_modinter.h" /* prototype of my module declaration */
15 #include "gras/module.h"        /* module mecanism */
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm, amok, "peer management");
18
19
20 /* data management */
21 int amok_pm_moddata_id = -1;
22 typedef struct {
23   int done;
24   xbt_dict_t groups;
25 } s_amok_pm_moddata_t, *amok_pm_moddata_t;
26
27 /* Payload of join message */
28 typedef struct {
29   char *group;
30   int rank;
31 } s_amok_pm_msg_join_t, *amok_pm_msg_join_t;
32
33 /* Message callbacks */
34 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
35 {
36
37   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
38   g->done = 1;
39   return 0;
40 }
41
42 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx, void *payload_data)
43 {
44
45   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
46   g->done = 1;
47   gras_msg_rpcreturn(30, ctx, NULL);
48   return 0;
49 }
50
51 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload)
52 {
53   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
54   char *name = *(void **) payload;
55   xbt_dynar_t res = xbt_dict_get(g->groups, name);
56
57   gras_msg_rpcreturn(30, ctx, &res);
58   return 0;
59 }
60
61 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload)
62 {
63   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
64   amok_pm_msg_join_t msg = *(amok_pm_msg_join_t *) payload;
65   xbt_dynar_t group = xbt_dict_get(g->groups, msg->group);
66
67   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
68   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
69                                  gras_socket_peer_port(exp));
70   xbt_peer_t previous = NULL;
71
72   if (msg->rank >= 0 && xbt_dynar_length(group) >= msg->rank + 1)
73     xbt_dynar_get_cpy(group, msg->rank, &previous);
74
75   VERB3("Contacted by %s:%d for rank %d", dude->name, dude->port, msg->rank);
76   if (msg->rank < 0) {
77     xbt_dynar_push(group, &dude);
78   } else {
79     if (previous)
80       THROW4(arg_error, 0,
81              "You wanted to get rank %d of group %s, but %s:%d is already there",
82              msg->rank, msg->group, previous->name, previous->port);
83     xbt_dynar_set(group, msg->rank, &dude);
84   }
85
86   gras_msg_rpcreturn(10, ctx, NULL);
87   free(msg->group);
88   free(msg);
89   return 0;
90 }
91
92 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload)
93 {
94   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
95   char *name = *(void **) payload;
96   xbt_dynar_t group = xbt_dict_get(g->groups, name);
97
98   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
99   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
100                                  gras_socket_peer_port(exp));
101
102   unsigned int cpt;
103   xbt_peer_t peer_it;
104
105   xbt_dynar_foreach(group, cpt, peer_it) {
106     if (!strcmp(peer_it->name, dude->name) && peer_it->port == dude->port) {
107       xbt_dynar_cursor_rm(group, &cpt);
108       goto end;
109     }
110   }
111   WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
112         dude->name, dude->port, name);
113
114 end:
115   gras_msg_rpcreturn(30, ctx, NULL);
116   return 0;
117 }
118
119 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload)
120 {
121   char *name = *(void **) payload;
122   amok_pm_group_shutdown(name);
123
124   gras_msg_rpcreturn(30, ctx, NULL);
125   return 0;
126 }
127
128 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
129 void amok_pm_mainloop(double timeOut)
130 {
131   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
132
133   while (!g->done) {
134     gras_msg_handle(timeOut);
135   }
136 }
137
138 /** \brief kill a buddy identified by its peername and port. Note that it is not removed from any group it may belong to. */
139 void amok_pm_kill_hp(char *name, int port)
140 {
141   gras_socket_t sock = gras_socket_client(name, port);
142   amok_pm_kill(sock);
143   gras_socket_close(sock);
144 }
145
146 /** \brief kill a buddy to which we have a socket already. Note that it is not removed from any group it may belong to. */
147 void amok_pm_kill(gras_socket_t buddy)
148 {
149   gras_msg_send(buddy, "amok_pm_kill", NULL);
150 }
151
152 /** \brief kill syncronously a buddy (do not return before its death). Note that it is not removed from any group it may belong to. */
153 void amok_pm_kill_sync(gras_socket_t buddy)
154 {
155   gras_msg_rpccall(buddy, 30, "amok_pm_killrpc", NULL, NULL);
156 }
157
158
159 /** \brief create a new peermanagement group located on local peer 
160  *
161  * The dynar elements are of type xbt_peer_t
162  */
163 xbt_dynar_t amok_pm_group_new(const char *group_name)
164 {
165   amok_pm_moddata_t g;
166   xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
167                                   xbt_peer_free_voidp);
168
169   xbt_assert0(amok_pm_moddata_id != -1, "Run amok_pm_init first!");
170   g = gras_moddata_by_id(amok_pm_moddata_id);
171
172   DEBUG1("retrieved groups=%p", g->groups);
173
174   xbt_dict_set(g->groups, group_name, res, NULL);       /*FIXME: leaking xbt_dynar_free_voidp); */
175   VERB1("Group %s created", group_name);
176
177   return res;
178 }
179
180 /** \brief retrieve all members of the given remote group */
181 xbt_dynar_t amok_pm_group_get(gras_socket_t master, const char *group_name)
182 {
183   xbt_dynar_t res;
184
185   gras_msg_rpccall(master, 30, "amok_pm_get", &group_name, &res);
186   return res;
187 }
188
189 /** \brief add current peer to the given remote group 
190  *
191  * You should provide rank only when you want to force the order of members 
192  * (for example in strict test cases). Give -1 to leave the system choose it for you.
193  */
194 void amok_pm_group_join(gras_socket_t master, const char *group_name,
195                         int rank)
196 {
197   amok_pm_msg_join_t msg = xbt_new(s_amok_pm_msg_join_t, 1);
198   msg->group = (char *) group_name;
199   msg->rank = rank;
200   VERB4("Join group '%s' on %s:%d (at rank %d)",
201         group_name, gras_socket_peer_name(master),
202         gras_socket_peer_port(master), rank);
203   gras_msg_rpccall(master, 30, "amok_pm_join", &msg, NULL);
204   free(msg);
205   VERB3("Joined group '%s' on %s:%d",
206         group_name, gras_socket_peer_name(master),
207         gras_socket_peer_port(master));
208 }
209
210 /** \brief remove current peer from the given remote group if found
211  *
212  * If not found, call is ignored 
213  */
214 void amok_pm_group_leave(gras_socket_t master, const char *group_name)
215 {
216   gras_msg_rpccall(master, 30, "amok_pm_leave", &group_name, NULL);
217   VERB3("Leaved group '%s' on %s:%d",
218         group_name, gras_socket_peer_name(master),
219         gras_socket_peer_port(master));
220 }
221
222 /** \brief stops all members of the given local group */
223 void amok_pm_group_shutdown(const char *group_name)
224 {
225   amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
226   xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
227
228   unsigned int cpt;
229   xbt_peer_t peer_it;
230
231   xbt_dynar_foreach(group, cpt, peer_it) {
232     amok_pm_kill_hp(peer_it->name, peer_it->port);
233   }
234
235   xbt_dynar_free(&group);
236   xbt_dict_remove(g->groups, group_name);
237 }
238
239 /** \brief stops all members of the given remote group */
240 void amok_pm_group_shutdown_remote(gras_socket_t master,
241                                    const char *group_name)
242 {
243   gras_msg_rpccall(master, 30, "amok_pm_shutdown", &group_name, NULL);
244 }
245
246
247 /* *
248  * *
249  * * Module management functions
250  * *
251  * */
252
253
254
255 static void _amok_pm_init(void)
256 {
257   /* no world-wide globals */
258   /* Datatype and message declarations */
259   gras_datadesc_type_t pm_group_type =
260     gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"),
261                         xbt_peer_free_voidp);
262
263   gras_datadesc_type_t msg_join_t =
264     gras_datadesc_struct("s_amok_pm_moddata_t");
265   gras_datadesc_struct_append(msg_join_t, "group",
266                               gras_datadesc_by_name("string"));
267   gras_datadesc_struct_append(msg_join_t, "rank",
268                               gras_datadesc_by_name("int"));
269   gras_datadesc_struct_close(msg_join_t);
270   msg_join_t =
271     gras_datadesc_ref("amok_pm_moddata_t",
272                       gras_datadesc_by_name("s_amok_pm_moddata_t"));
273
274   gras_msgtype_declare("amok_pm_kill", NULL);
275   gras_msgtype_declare_rpc("amok_pm_killrpc", NULL, NULL);
276
277   gras_msgtype_declare_rpc("amok_pm_get",
278                            gras_datadesc_by_name("string"), pm_group_type);
279   gras_msgtype_declare_rpc("amok_pm_join", msg_join_t, NULL);
280   gras_msgtype_declare_rpc("amok_pm_leave",
281                            gras_datadesc_by_name("string"), NULL);
282
283   gras_msgtype_declare_rpc("amok_pm_shutdown",
284                            gras_datadesc_by_name("string"), NULL);
285 }
286
287 static void _amok_pm_join(void *p)
288 {
289   /* moddata management */
290   amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
291
292   mod->groups = NULL;
293
294   mod->done = 0;
295   mod->groups = xbt_dict_new();
296
297   /* callbacks */
298   gras_cb_register("amok_pm_kill", &amok_pm_cb_kill);
299   gras_cb_register("amok_pm_killrpc", &amok_pm_cb_killrpc);
300
301   gras_cb_register("amok_pm_get", &amok_pm_cb_get);
302   gras_cb_register("amok_pm_join", &amok_pm_cb_join);
303   gras_cb_register("amok_pm_leave", &amok_pm_cb_leave);
304   gras_cb_register("amok_pm_shutdown", &amok_pm_cb_shutdown);
305 }
306
307 static void _amok_pm_exit(void)
308 {
309   /* no world-wide globals */
310 }
311
312 static void _amok_pm_leave(void *p)
313 {
314   /* moddata */
315   amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
316
317   if (mod->groups)
318     xbt_dict_free(&mod->groups);
319
320   /* callbacks */
321   gras_cb_unregister("amok_pm_kill", &amok_pm_cb_kill);
322   gras_cb_unregister("amok_pm_killrpc", &amok_pm_cb_killrpc);
323
324   gras_cb_unregister("amok_pm_get", &amok_pm_cb_get);
325   gras_cb_unregister("amok_pm_join", &amok_pm_cb_join);
326   gras_cb_unregister("amok_pm_leave", &amok_pm_cb_leave);
327   gras_cb_unregister("amok_pm_shutdown", &amok_pm_cb_shutdown);
328 }
329
330 void amok_pm_modulecreate()
331 {
332   gras_module_add("amok_pm", sizeof(s_amok_pm_moddata_t), &amok_pm_moddata_id,
333                   _amok_pm_init, _amok_pm_exit, _amok_pm_join,
334                   _amok_pm_leave);
335 }
336
337
338
339 /* *
340  * *
341  * * Old module functions (kept for compatibility)
342  * *
343  * */
344 /** \brief Initialize the peer management module. Every process must run it before use */
345 void amok_pm_init()
346 {
347   gras_module_join("amok_pm");
348 }
349
350 /** \brief Finalize the peer management module. Every process should run it after use */
351 void amok_pm_exit()
352 {
353   gras_module_leave("amok_pm");
354 }