Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a indent rule to format the code in an uniform way, avoiding breaking the branche...
[simgrid.git] / src / amok / PeerManagement / peermanagement.c
1 /* $Id$ */
2
3 /* amok peer management - servers main loop and remote peer stopping        */
4
5 /* Copyright (c) 2006 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/sysdep.h"
11 #include "xbt/peer.h"
12 #include "amok/peermanagement.h"
13
14 #include "amok/amok_modinter.h" /* prototype of my module declaration */
15 #include "gras/module.h" /* module mecanism */
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm,amok,"peer management");
18
19
20 /* data management */
21 int amok_pm_moddata_id=-1;
22 typedef struct {
23    int done;
24    xbt_dict_t groups;
25 } s_amok_pm_moddata_t, *amok_pm_moddata_t;
26
27 /* Payload of join message */
28 typedef struct {
29    char *group;
30    int rank;
31 } s_amok_pm_msg_join_t,*amok_pm_msg_join_t;
32
33 /* Message callbacks */
34 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx,
35                            void             *payload_data) {
36
37   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
38   g->done = 1;
39   return 0;
40 }
41 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx,
42                               void             *payload_data) {
43
44   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
45   g->done = 1;
46   gras_msg_rpcreturn(30,ctx,NULL);
47   return 0;
48 }
49
50 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload) {
51   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
52   char *name = *(void**)payload;
53   xbt_dynar_t res = xbt_dict_get(g->groups, name);
54
55   gras_msg_rpcreturn(30, ctx, &res);
56   return 0;
57 }
58 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload) {
59   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
60   amok_pm_msg_join_t msg = *(amok_pm_msg_join_t*)payload;
61   xbt_dynar_t group = xbt_dict_get(g->groups, msg->group);
62   
63   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
64   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
65                                  gras_socket_peer_port(exp));
66   xbt_peer_t previous = NULL;
67  
68   if (msg->rank >= 0 && xbt_dynar_length(group) >= msg->rank +1)
69      xbt_dynar_get_cpy(group,msg->rank,&previous);
70
71   VERB3("Contacted by %s:%d for rank %d",dude->name,dude->port,msg->rank);
72   if (msg->rank < 0) {
73     xbt_dynar_push(group,&dude);
74   } else {      
75     if (previous) 
76        THROW4(arg_error,0,"You wanted to get rank %d of group %s, but %s:%d is already there",
77               msg->rank, msg->group, previous->name, previous->port);
78      xbt_dynar_set(group,msg->rank,&dude);
79   }
80    
81   gras_msg_rpcreturn(10, ctx, NULL);
82   free(msg->group);
83   free(msg);
84   return 0;
85 }
86 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload) {
87   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
88   char *name = *(void**)payload;
89   xbt_dynar_t group = xbt_dict_get(g->groups, name);
90   
91   gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
92   xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
93                                  gras_socket_peer_port(exp));
94
95   unsigned int cpt;
96   xbt_peer_t peer_it;
97
98   xbt_dynar_foreach(group, cpt, peer_it) {
99     if (!strcmp(peer_it->name, dude->name) && 
100         peer_it->port == dude->port) {
101       xbt_dynar_cursor_rm (group,&cpt);
102       goto end;
103     }
104   }
105   WARN3("Asked to remove %s:%d from group '%s', but not found. Ignoring",
106         dude->name,dude->port, name);
107
108  end:
109   gras_msg_rpcreturn(30, ctx, NULL);
110   return 0;
111 }
112
113 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload) {
114   char *name = *(void**)payload;
115   amok_pm_group_shutdown(name);
116
117   gras_msg_rpcreturn(30, ctx, NULL);
118   return 0;
119 }
120
121 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
122 void amok_pm_mainloop(double timeOut) {
123   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
124   
125   while (!g->done) {
126     gras_msg_handle(timeOut);
127   }
128 }
129
130 /** \brief kill a buddy identified by its peername and port. Note that it is not removed from any group it may belong to. */
131 void amok_pm_kill_hp(char *name,int port) {
132   gras_socket_t sock=gras_socket_client(name,port);
133   amok_pm_kill(sock);
134   gras_socket_close(sock);
135 }
136
137 /** \brief kill a buddy to which we have a socket already. Note that it is not removed from any group it may belong to. */
138 void amok_pm_kill(gras_socket_t buddy) {
139   gras_msg_send(buddy,"amok_pm_kill",NULL);
140 }
141
142 /** \brief kill syncronously a buddy (do not return before its death). Note that it is not removed from any group it may belong to. */
143 void amok_pm_kill_sync(gras_socket_t buddy) {
144   gras_msg_rpccall(buddy,30,"amok_pm_killrpc",NULL,NULL);
145 }
146
147
148 /** \brief create a new peermanagement group located on local peer 
149  *
150  * The dynar elements are of type xbt_peer_t
151  */
152 xbt_dynar_t amok_pm_group_new(const char *group_name) {
153   amok_pm_moddata_t g;
154   xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
155                                   xbt_peer_free_voidp);
156
157   xbt_assert0(amok_pm_moddata_id != -1,"Run amok_pm_init first!");
158   g=gras_moddata_by_id(amok_pm_moddata_id);
159
160   DEBUG1("retrieved groups=%p",g->groups);
161    
162   xbt_dict_set(g->groups,group_name,res,NULL); /*FIXME: leaking xbt_dynar_free_voidp);*/
163   VERB1("Group %s created",group_name);
164
165   return res;
166 }
167 /** \brief retrieve all members of the given remote group */
168 xbt_dynar_t amok_pm_group_get(gras_socket_t master, const char *group_name) {
169   xbt_dynar_t res;
170   
171   gras_msg_rpccall(master,30,"amok_pm_get", &group_name,&res);
172   return res;
173 }
174
175 /** \brief add current peer to the given remote group 
176  *
177  * You should provide rank only when you want to force the order of members 
178  * (for example in strict test cases). Give -1 to leave the system choose it for you.
179  */
180 void        amok_pm_group_join(gras_socket_t master, const char *group_name,int rank) {
181   amok_pm_msg_join_t msg = xbt_new(s_amok_pm_msg_join_t,1);
182   msg->group = (char*)group_name;
183   msg->rank = rank;
184   VERB4("Join group '%s' on %s:%d (at rank %d)",
185         group_name,gras_socket_peer_name(master),gras_socket_peer_port(master),rank);
186   gras_msg_rpccall(master,30,"amok_pm_join", &msg,NULL);
187   free(msg);
188   VERB3("Joined group '%s' on %s:%d",
189         group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
190 }
191 /** \brief remove current peer from the given remote group if found
192  *
193  * If not found, call is ignored 
194  */
195 void        amok_pm_group_leave(gras_socket_t master, const char *group_name) {
196   gras_msg_rpccall(master,30,"amok_pm_leave", &group_name,NULL);
197   VERB3("Leaved group '%s' on %s:%d",
198         group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
199 }
200
201 /** \brief stops all members of the given local group */
202 void amok_pm_group_shutdown(const char *group_name) {
203   amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
204   xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
205   
206   unsigned int cpt;
207   xbt_peer_t peer_it;
208
209   xbt_dynar_foreach(group, cpt, peer_it) {
210     amok_pm_kill_hp(peer_it->name, peer_it->port);
211   }
212
213   xbt_dynar_free(&group);
214   xbt_dict_remove(g->groups,group_name);
215 }
216 /** \brief stops all members of the given remote group */
217 void amok_pm_group_shutdown_remote(gras_socket_t master, const char *group_name){
218   gras_msg_rpccall(master,30,"amok_pm_shutdown", &group_name,NULL);
219 }
220
221
222 /* *
223  * *
224  * * Module management functions
225  * *
226  * */
227
228
229
230 static void _amok_pm_init(void) {
231    /* no world-wide globals */
232    /* Datatype and message declarations */
233    gras_datadesc_type_t pm_group_type = gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"), xbt_peer_free_voidp);
234    
235    gras_datadesc_type_t msg_join_t = gras_datadesc_struct("s_amok_pm_moddata_t");
236    gras_datadesc_struct_append(msg_join_t,"group", gras_datadesc_by_name("string"));
237    gras_datadesc_struct_append(msg_join_t,"rank", gras_datadesc_by_name("int"));
238    gras_datadesc_struct_close(msg_join_t);
239    msg_join_t=gras_datadesc_ref("amok_pm_moddata_t", gras_datadesc_by_name("s_amok_pm_moddata_t"));
240    
241    gras_msgtype_declare("amok_pm_kill",NULL);   
242    gras_msgtype_declare_rpc("amok_pm_killrpc",NULL,NULL);   
243    
244    gras_msgtype_declare_rpc("amok_pm_get",
245                             gras_datadesc_by_name("string"),
246                             pm_group_type);
247    gras_msgtype_declare_rpc("amok_pm_join", msg_join_t,NULL);
248    gras_msgtype_declare_rpc("amok_pm_leave",
249                             gras_datadesc_by_name("string"),
250                             NULL);
251    
252    gras_msgtype_declare_rpc("amok_pm_shutdown",
253                             gras_datadesc_by_name("string"),
254                             NULL);   
255 }
256
257 static void _amok_pm_join(void *p) {
258   /* moddata management */
259   amok_pm_moddata_t mod = (amok_pm_moddata_t)p;
260
261   mod->groups = NULL;
262   
263   mod->done = 0;
264   mod->groups = xbt_dict_new();
265   
266   /* callbacks */
267   gras_cb_register("amok_pm_kill",   &amok_pm_cb_kill);
268   gras_cb_register("amok_pm_killrpc",&amok_pm_cb_killrpc);
269
270   gras_cb_register("amok_pm_get",      &amok_pm_cb_get);
271   gras_cb_register("amok_pm_join",     &amok_pm_cb_join);
272   gras_cb_register("amok_pm_leave",    &amok_pm_cb_leave);
273   gras_cb_register("amok_pm_shutdown", &amok_pm_cb_shutdown);   
274 }
275 static void _amok_pm_exit(void) {
276   /* no world-wide globals */
277 }
278 static void _amok_pm_leave(void *p) {
279    /* moddata */
280    amok_pm_moddata_t mod = (amok_pm_moddata_t)p;
281    
282    if (mod->groups)
283      xbt_dict_free(&mod->groups);
284    
285    /* callbacks */
286    gras_cb_unregister("amok_pm_kill",   &amok_pm_cb_kill);
287    gras_cb_unregister("amok_pm_killrpc",&amok_pm_cb_killrpc);
288
289    gras_cb_unregister("amok_pm_get",      &amok_pm_cb_get);
290    gras_cb_unregister("amok_pm_join",     &amok_pm_cb_join);
291    gras_cb_unregister("amok_pm_leave",    &amok_pm_cb_leave);
292    gras_cb_unregister("amok_pm_shutdown", &amok_pm_cb_shutdown);   
293 }
294
295 void amok_pm_modulecreate() {
296   gras_module_add("amok_pm", sizeof(s_amok_pm_moddata_t), &amok_pm_moddata_id,
297                   _amok_pm_init,_amok_pm_exit,_amok_pm_join,_amok_pm_leave);
298 }
299
300
301
302 /* *
303  * *
304  * * Old module functions (kept for compatibility)
305  * *
306  * */
307 /** \brief Initialize the peer management module. Every process must run it before use */
308 void amok_pm_init() {
309   gras_module_join("amok_pm");
310 }
311
312 /** \brief Finalize the peer management module. Every process should run it after use */
313 void amok_pm_exit() {
314   gras_module_leave("amok_pm");
315 }