Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
73c383f3ea9eb1ed5a1041abd2e4cca993f26934
[simgrid.git] / src / amok / PeerManagement / peermanagement.c
1 /* $Id$ */
2
3 /* amok peer management - servers main loop and remote peer stopping        */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/sysdep.h"
11 #include "xbt/peer.h"
12 #include "amok/peermanagement.h"
13 #include "gras/Virtu/virtu_interface.h" /* libdata */
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm,amok,"peer management");
16
17
18 /* libdata management */
19 static int amok_pm_libdata_id=-1;
20 typedef struct {
21   /* set headers */
22   unsigned int ID;
23   char        *name;
24   unsigned int name_len;
25
26   /* payload */
27   int done;
28   xbt_dict_t groups;
29 } s_amok_pm_libdata_t, *amok_pm_libdata_t;
30
31 static void *amok_pm_libdata_new() {
32   amok_pm_libdata_t res=xbt_new(s_amok_pm_libdata_t,1);
33   res->name=xbt_strdup("amok_pm");
34   res->name_len=0;
35   res->done = 0;
36   res->groups = xbt_dict_new();
37   return res;
38 }
39 static void amok_pm_libdata_free(void *d) {
40   amok_pm_libdata_t data=(amok_pm_libdata_t)d;
41   free(data->name);
42   xbt_dict_free(&data->groups);
43   free (data);
44 }
45
46 /* Message callbacks */
47 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx,
48                            void             *payload_data) {
49
50   amok_pm_libdata_t g=gras_libdata_by_id(amok_pm_libdata_id);
51   g->done = 1;
52   return 1;
53 }
54 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx,
55                               void             *payload_data) {
56
57   amok_pm_libdata_t g=gras_libdata_by_id(amok_pm_libdata_id);
58   g->done = 1;
59   gras_msg_rpcreturn(30,ctx,NULL);
60   return 1;
61 }
62
63 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload) {
64   amok_pm_libdata_t g=gras_libdata_by_id(amok_pm_libdata_id);
65   char *name = *(void**)payload;
66   xbt_dynar_t res = xbt_dict_get(g->groups, name);
67
68   gras_msg_rpcreturn(30, ctx, &res);
69   return 1;
70 }
71 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload) {
72   amok_pm_libdata_t g=gras_libdata_by_id(amok_pm_libdata_id);
73   char *name = *(void**)payload;
74   xbt_dynar_t group = xbt_dict_get(g->groups, name);
75   
76   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
77   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
78                                  gras_socket_peer_port(exp));
79
80   VERB2("Contacted by %s:%d",dude->name,dude->port);
81   xbt_dynar_push(group,&dude);
82
83   gras_msg_rpcreturn(10, ctx, NULL);
84   free(name);
85   return 1;
86 }
87 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload) {
88   amok_pm_libdata_t g=gras_libdata_by_id(amok_pm_libdata_id);
89   char *name = *(void**)payload;
90   xbt_dynar_t group = xbt_dict_get(g->groups, name);
91   
92   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
93   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
94                                  gras_socket_peer_port(exp));
95
96   int cpt;
97   xbt_peer_t peer_it;
98
99   xbt_dynar_foreach(group, cpt, peer_it) {
100     if (!strcmp(peer_it->name, dude->name) && 
101         peer_it->port == dude->port) {
102       xbt_dynar_cursor_rm (group,&cpt);
103       goto end;
104     }
105   }
106   WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
107         dude->name,dude->port, name);
108
109  end:
110   gras_msg_rpcreturn(30, ctx, NULL);
111   return 1;
112 }
113
114 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload) {
115   char *name = *(void**)payload;
116   amok_pm_group_shutdown(name);
117
118   gras_msg_rpcreturn(30, ctx, NULL);
119   return 1;
120 }
121
122
123 /* Initialization stuff */
124 static short amok_pm_used = 0;
125
126 /** \brief Initialize the peer management module. Every process must run it before use */
127 void amok_pm_init() {
128   /* pure INIT part */
129   if (! amok_pm_used) {
130
131     /* dependencies */
132     amok_base_init();
133
134     /* module data on each process */
135     amok_pm_libdata_id = gras_procdata_add("amok_pm",
136                                            amok_pm_libdata_new,
137                                            amok_pm_libdata_free);
138
139     /* Datatype and message declarations */
140     gras_msgtype_declare("amok_pm_kill",NULL);   
141     gras_msgtype_declare_rpc("amok_pm_killrpc",NULL,NULL);   
142
143     gras_msgtype_declare_rpc("amok_pm_get",
144                              gras_datadesc_by_name("string"),
145                              gras_datadesc_by_name("xbt_dynar_t"));
146     gras_msgtype_declare_rpc("amok_pm_join",
147                              gras_datadesc_by_name("string"),
148                              NULL);
149     gras_msgtype_declare_rpc("amok_pm_leave",
150                              gras_datadesc_by_name("string"),
151                              NULL);
152
153     gras_msgtype_declare_rpc("amok_pm_shutdown",
154                              gras_datadesc_by_name("string"),
155                              NULL);
156   }
157   amok_pm_used++;
158
159   /* JOIN part */
160   gras_cb_register(gras_msgtype_by_name("amok_pm_kill"),
161                    &amok_pm_cb_kill);
162   gras_cb_register(gras_msgtype_by_name("amok_pm_killrpc"),
163                    &amok_pm_cb_killrpc);
164
165   gras_cb_register(gras_msgtype_by_name("amok_pm_get"),
166                    &amok_pm_cb_get);
167   gras_cb_register(gras_msgtype_by_name("amok_pm_join"),
168                    &amok_pm_cb_join);
169   gras_cb_register(gras_msgtype_by_name("amok_pm_leave"),
170                    &amok_pm_cb_leave);
171   gras_cb_register(gras_msgtype_by_name("amok_pm_shutdown"),
172                    &amok_pm_cb_shutdown);
173 }
174
175 /** \brief Finalize the peer management module. Every process should run it after use */
176 void amok_pm_exit() {
177   /* pure EXIT part */
178   amok_pm_used--;
179
180   /* LEAVE part */
181   gras_cb_unregister(gras_msgtype_by_name("amok_pm_kill"),
182                      &amok_pm_cb_kill);
183   gras_cb_unregister(gras_msgtype_by_name("amok_pm_killrpc"),
184                      &amok_pm_cb_killrpc);
185
186   gras_cb_unregister(gras_msgtype_by_name("amok_pm_get"),
187                      &amok_pm_cb_get);
188   gras_cb_unregister(gras_msgtype_by_name("amok_pm_join"),
189                      &amok_pm_cb_join);
190   gras_cb_unregister(gras_msgtype_by_name("amok_pm_leave"),
191                      &amok_pm_cb_leave);
192
193   gras_cb_unregister(gras_msgtype_by_name("amok_pm_shutdown"),
194                      &amok_pm_cb_shutdown);
195 }
196
197
198 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
199 void amok_pm_mainloop(double timeOut) {
200   amok_pm_libdata_t g=gras_libdata_by_id(amok_pm_libdata_id);
201   
202   while (!g->done) {
203     gras_msg_handle(timeOut);
204   }
205 }
206
207 /** \brief kill a buddy identified by its peername and port */
208 void amok_pm_kill_hp(char *name,int port) {
209   gras_socket_t sock=gras_socket_client(name,port);
210   amok_pm_kill(sock);
211   gras_socket_close(sock);
212 }
213
214 /** \brief kill a buddy to which we have a socket already */
215 void amok_pm_kill(gras_socket_t buddy) {
216   gras_msg_send(buddy,gras_msgtype_by_name("amok_pm_kill"),NULL);
217 }
218
219 /** \brief kill syncronously a buddy (do not return before its death) */
220 void amok_pm_kill_sync(gras_socket_t buddy) {
221   gras_msg_rpccall(buddy,30,gras_msgtype_by_name("amok_pm_killrpc"),NULL,NULL);
222 }
223
224
225 /** \brief create a new peermanagement group located on local peer 
226  *
227  * The dynar elements are of type xbt_peer_t
228  */
229 xbt_dynar_t amok_pm_group_new(const char *group_name) {
230   amok_pm_libdata_t g;
231   xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
232                                   xbt_peer_free_voidp);
233
234   xbt_assert0(amok_pm_libdata_id != -1,"Run amok_pm_init first!");
235   g=gras_libdata_by_id(amok_pm_libdata_id);
236    
237   xbt_dict_set(g->groups,group_name,res,NULL); /*FIXME: leaking xbt_dynar_free_voidp);*/
238   VERB1("Group %s created",group_name);
239
240   return res;
241 }
242 /** \brief retrieve all members of the given remote group */
243 xbt_dynar_t amok_pm_group_get(gras_socket_t master, const char *group_name) {
244   xbt_dynar_t res;
245   
246   gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_get"),
247                    &group_name,&res);
248   return res;
249 }
250
251 /** \brief add current peer to the given remote group */
252 void        amok_pm_group_join(gras_socket_t master, const char *group_name) {
253   VERB3("Join group '%s' on %s:%d",
254         group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
255   gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_join"),
256                    &group_name,NULL);
257   VERB3("Joined group '%s' on %s:%d",
258         group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
259 }
260 /** \brief remove current peer from the given remote group if found
261  *
262  * If not found, call is ignored 
263  */
264 void        amok_pm_group_leave(gras_socket_t master, const char *group_name) {
265   gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_leave"),
266                    &group_name,NULL);
267   VERB3("Leaved group '%s' on %s:%d",
268         group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
269 }
270
271 /** \brief stops all members of the given local group */
272 void amok_pm_group_shutdown(const char *group_name) {
273   amok_pm_libdata_t g=gras_libdata_by_id(amok_pm_libdata_id);
274   xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
275   
276   int cpt;
277   xbt_peer_t peer_it;
278
279   xbt_dynar_foreach(group, cpt, peer_it) {
280     amok_pm_kill_hp(peer_it->name, peer_it->port);
281   }
282
283   xbt_dynar_free(&group);
284   xbt_dict_remove(g->groups,group_name);
285 }
286 /** \brief stops all members of the given remote group */
287 void amok_pm_group_shutdown_remote(gras_socket_t master, const char *group_name){
288   gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_shutdown"),
289                    &group_name,NULL);
290 }