Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
leak plugs
[simgrid.git] / src / amok / PeerManagement / peermanagement.c
1 /* $Id$ */
2
3 /* amok peer management - servers main loop and remote peer stopping        */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/sysdep.h"
11 #include "xbt/peer.h"
12 #include "amok/peermanagement.h"
13
14 #include "amok/amok_modinter.h" /* prototype of my module declaration */
15 #include "gras/module.h" /* module mecanism */
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm,amok,"peer management");
18
19
20 /* data management */
21 int amok_pm_moddata_id=-1;
22 typedef struct {
23    int done;
24    xbt_dict_t groups;
25 } s_amok_pm_moddata_t, *amok_pm_moddata_t;
26
27
28 /* Message callbacks */
29 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx,
30                            void             *payload_data) {
31
32   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
33   g->done = 1;
34   return 1;
35 }
36 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx,
37                               void             *payload_data) {
38
39   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
40   g->done = 1;
41   gras_msg_rpcreturn(30,ctx,NULL);
42   return 1;
43 }
44
45 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload) {
46   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
47   char *name = *(void**)payload;
48   xbt_dynar_t res = xbt_dict_get(g->groups, name);
49
50   gras_msg_rpcreturn(30, ctx, &res);
51   return 1;
52 }
53 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload) {
54   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
55   char *name = *(void**)payload;
56   xbt_dynar_t group = xbt_dict_get(g->groups, name);
57   
58   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
59   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
60                                  gras_socket_peer_port(exp));
61
62   VERB2("Contacted by %s:%d",dude->name,dude->port);
63   xbt_dynar_push(group,&dude);
64
65   gras_msg_rpcreturn(10, ctx, NULL);
66   free(name);
67   return 1;
68 }
69 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload) {
70   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
71   char *name = *(void**)payload;
72   xbt_dynar_t group = xbt_dict_get(g->groups, name);
73   
74   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
75   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
76                                  gras_socket_peer_port(exp));
77
78   int cpt;
79   xbt_peer_t peer_it;
80
81   xbt_dynar_foreach(group, cpt, peer_it) {
82     if (!strcmp(peer_it->name, dude->name) && 
83         peer_it->port == dude->port) {
84       xbt_dynar_cursor_rm (group,&cpt);
85       goto end;
86     }
87   }
88   WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
89         dude->name,dude->port, name);
90
91  end:
92   gras_msg_rpcreturn(30, ctx, NULL);
93   return 1;
94 }
95
96 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload) {
97   char *name = *(void**)payload;
98   amok_pm_group_shutdown(name);
99
100   gras_msg_rpcreturn(30, ctx, NULL);
101   return 1;
102 }
103
104 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
105 void amok_pm_mainloop(double timeOut) {
106   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
107   
108   while (!g->done) {
109     gras_msg_handle(timeOut);
110   }
111 }
112
113 /** \brief kill a buddy identified by its peername and port */
114 void amok_pm_kill_hp(char *name,int port) {
115   gras_socket_t sock=gras_socket_client(name,port);
116   amok_pm_kill(sock);
117   gras_socket_close(sock);
118 }
119
120 /** \brief kill a buddy to which we have a socket already */
121 void amok_pm_kill(gras_socket_t buddy) {
122   gras_msg_send(buddy,gras_msgtype_by_name("amok_pm_kill"),NULL);
123 }
124
125 /** \brief kill syncronously a buddy (do not return before its death) */
126 void amok_pm_kill_sync(gras_socket_t buddy) {
127   gras_msg_rpccall(buddy,30,gras_msgtype_by_name("amok_pm_killrpc"),NULL,NULL);
128 }
129
130
131 /** \brief create a new peermanagement group located on local peer 
132  *
133  * The dynar elements are of type xbt_peer_t
134  */
135 xbt_dynar_t amok_pm_group_new(const char *group_name) {
136   amok_pm_moddata_t g;
137   xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
138                                   xbt_peer_free_voidp);
139
140   xbt_assert0(amok_pm_moddata_id != -1,"Run amok_pm_init first!");
141   g=gras_moddata_by_id(amok_pm_moddata_id);
142
143   DEBUG1("retrieved groups=%p",g->groups);
144    
145   xbt_dict_set(g->groups,group_name,res,NULL); /*FIXME: leaking xbt_dynar_free_voidp);*/
146   VERB1("Group %s created",group_name);
147
148   return res;
149 }
150 /** \brief retrieve all members of the given remote group */
151 xbt_dynar_t amok_pm_group_get(gras_socket_t master, const char *group_name) {
152   xbt_dynar_t res;
153   
154   gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_get"),
155                    &group_name,&res);
156   return res;
157 }
158
159 /** \brief add current peer to the given remote group */
160 void        amok_pm_group_join(gras_socket_t master, const char *group_name) {
161   VERB3("Join group '%s' on %s:%d",
162         group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
163   gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_join"),
164                    &group_name,NULL);
165   VERB3("Joined group '%s' on %s:%d",
166         group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
167 }
168 /** \brief remove current peer from the given remote group if found
169  *
170  * If not found, call is ignored 
171  */
172 void        amok_pm_group_leave(gras_socket_t master, const char *group_name) {
173   gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_leave"),
174                    &group_name,NULL);
175   VERB3("Leaved group '%s' on %s:%d",
176         group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
177 }
178
179 /** \brief stops all members of the given local group */
180 void amok_pm_group_shutdown(const char *group_name) {
181   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
182   xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
183   
184   int cpt;
185   xbt_peer_t peer_it;
186
187   xbt_dynar_foreach(group, cpt, peer_it) {
188     amok_pm_kill_hp(peer_it->name, peer_it->port);
189   }
190
191   xbt_dynar_free(&group);
192   xbt_dict_remove(g->groups,group_name);
193 }
194 /** \brief stops all members of the given remote group */
195 void amok_pm_group_shutdown_remote(gras_socket_t master, const char *group_name){
196   gras_msg_rpccall(master,30,gras_msgtype_by_name("amok_pm_shutdown"),
197                    &group_name,NULL);
198 }
199
200
201 /* *
202  * *
203  * * Module management functions
204  * *
205  * */
206
207
208
209 static void _amok_pm_init(void) {
210    /* no world-wide globals */
211    /* Datatype and message declarations */
212    gras_datadesc_type_t pm_group_type = gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"), xbt_peer_free_voidp);
213    
214    gras_msgtype_declare("amok_pm_kill",NULL);   
215    gras_msgtype_declare_rpc("amok_pm_killrpc",NULL,NULL);   
216    
217    gras_msgtype_declare_rpc("amok_pm_get",
218                             gras_datadesc_by_name("string"),
219                             pm_group_type);
220    gras_msgtype_declare_rpc("amok_pm_join",
221                             gras_datadesc_by_name("string"),
222                             NULL);
223    gras_msgtype_declare_rpc("amok_pm_leave",
224                             gras_datadesc_by_name("string"),
225                             NULL);
226    
227    gras_msgtype_declare_rpc("amok_pm_shutdown",
228                             gras_datadesc_by_name("string"),
229                             NULL);   
230 }
231
232 static void _amok_pm_join(void *p) {
233    /* moddata management */
234    amok_pm_moddata_t mod = (amok_pm_moddata_t)p;
235
236    mod->groups = NULL;
237    
238    mod->done = 0;
239    mod->groups = xbt_dict_new();
240    
241    /* callbacks */
242   gras_cb_register(gras_msgtype_by_name("amok_pm_kill"),
243                    &amok_pm_cb_kill);
244   gras_cb_register(gras_msgtype_by_name("amok_pm_killrpc"),
245                    &amok_pm_cb_killrpc);
246
247   gras_cb_register(gras_msgtype_by_name("amok_pm_get"),
248                    &amok_pm_cb_get);
249   gras_cb_register(gras_msgtype_by_name("amok_pm_join"),
250                    &amok_pm_cb_join);
251   gras_cb_register(gras_msgtype_by_name("amok_pm_leave"),
252                    &amok_pm_cb_leave);
253   gras_cb_register(gras_msgtype_by_name("amok_pm_shutdown"),
254                    &amok_pm_cb_shutdown);   
255 }
256 static void _amok_pm_exit(void) {
257   /* no world-wide globals */
258 }
259 static void _amok_pm_leave(void *p) {
260    /* moddata */
261    amok_pm_moddata_t mod = (amok_pm_moddata_t)p;
262    
263    if (mod->groups)
264      xbt_dict_free(&mod->groups);
265    
266    /* callbacks */
267    gras_cb_unregister(gras_msgtype_by_name("amok_pm_kill"),
268                       &amok_pm_cb_kill);
269    gras_cb_unregister(gras_msgtype_by_name("amok_pm_killrpc"),
270                       &amok_pm_cb_killrpc);
271
272    gras_cb_unregister(gras_msgtype_by_name("amok_pm_get"),
273                       &amok_pm_cb_get);
274    gras_cb_unregister(gras_msgtype_by_name("amok_pm_join"),
275                       &amok_pm_cb_join);
276    gras_cb_unregister(gras_msgtype_by_name("amok_pm_leave"),
277                       &amok_pm_cb_leave);
278
279    gras_cb_unregister(gras_msgtype_by_name("amok_pm_shutdown"),
280                       &amok_pm_cb_shutdown);
281 }
282
283 void amok_pm_modulecreate() {
284   gras_module_add("amok_pm", sizeof(s_amok_pm_moddata_t), &amok_pm_moddata_id,
285                   &_amok_pm_init,&_amok_pm_exit,&_amok_pm_join,&_amok_pm_leave);
286 }
287
288
289
290 /* *
291  * *
292  * * Old module functions (kept for compatibility)
293  * *
294  * */
295 /** \brief Initialize the peer management module. Every process must run it before use */
296 void amok_pm_init() {
297   gras_module_join("amok_pm");
298 }
299
300 /** \brief Finalize the peer management module. Every process should run it after use */
301 void amok_pm_exit() {
302   gras_module_leave("amok_pm");
303 }