xbt_dict_t groups;
} s_amok_pm_moddata_t, *amok_pm_moddata_t;
+/* Payload of join message */
+typedef struct {
+ char *group;
+ int rank;
+} s_amok_pm_msg_join_t,*amok_pm_msg_join_t;
/* Message callbacks */
static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx,
}
static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload) {
amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
- char *name = *(void**)payload;
- xbt_dynar_t group = xbt_dict_get(g->groups, name);
+ amok_pm_msg_join_t msg = *(amok_pm_msg_join_t*)payload;
+ xbt_dynar_t group = xbt_dict_get(g->groups, msg->group);
gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
gras_socket_peer_port(exp));
-
- VERB2("Contacted by %s:%d",dude->name,dude->port);
- xbt_dynar_push(group,&dude);
-
+ xbt_peer_t previous = NULL;
+
+ if (msg->rank >= 0 && xbt_dynar_length(group) >= msg->rank +1)
+ xbt_dynar_get_cpy(group,msg->rank,&previous);
+
+ VERB3("Contacted by %s:%d for rank %d",dude->name,dude->port,msg->rank);
+ if (msg->rank < 0) {
+ xbt_dynar_push(group,&dude);
+ } else {
+ if (previous)
+ THROW4(arg_error,0,"You wanted to get rank %d of group %s, but %s:%d is already there",
+ msg->rank, msg->group, previous->name, previous->port);
+ xbt_dynar_set(group,msg->rank,&dude);
+ }
+
gras_msg_rpcreturn(10, ctx, NULL);
- free(name);
+ free(msg->group);
+ free(msg);
return 0;
}
static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload) {
return res;
}
-/** \brief add current peer to the given remote group */
-void amok_pm_group_join(gras_socket_t master, const char *group_name) {
- VERB3("Join group '%s' on %s:%d",
- group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
- gras_msg_rpccall(master,30,"amok_pm_join", &group_name,NULL);
+/** \brief add current peer to the given remote group
+ *
+ * You should provide rank only when you want to force the order of members
+ * (for example in strict test cases). Give -1 to leave the system choose it for you.
+ */
+void amok_pm_group_join(gras_socket_t master, const char *group_name,int rank) {
+ amok_pm_msg_join_t msg = xbt_new(s_amok_pm_msg_join_t,1);
+ msg->group = (char*)group_name;
+ msg->rank = rank;
+ VERB4("Join group '%s' on %s:%d (at rank %d)",
+ group_name,gras_socket_peer_name(master),gras_socket_peer_port(master),rank);
+ gras_msg_rpccall(master,30,"amok_pm_join", &msg,NULL);
+ free(msg);
VERB3("Joined group '%s' on %s:%d",
group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
}
/* Datatype and message declarations */
gras_datadesc_type_t pm_group_type = gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"), xbt_peer_free_voidp);
+ gras_datadesc_type_t msg_join_t = gras_datadesc_struct("s_amok_pm_moddata_t");
+ gras_datadesc_struct_append(msg_join_t,"group", gras_datadesc_by_name("string"));
+ gras_datadesc_struct_append(msg_join_t,"rank", gras_datadesc_by_name("int"));
+ gras_datadesc_struct_close(msg_join_t);
+ msg_join_t=gras_datadesc_ref("amok_pm_moddata_t", gras_datadesc_by_name("s_amok_pm_moddata_t"));
+
gras_msgtype_declare("amok_pm_kill",NULL);
gras_msgtype_declare_rpc("amok_pm_killrpc",NULL,NULL);
gras_msgtype_declare_rpc("amok_pm_get",
gras_datadesc_by_name("string"),
pm_group_type);
- gras_msgtype_declare_rpc("amok_pm_join",
- gras_datadesc_by_name("string"),
- NULL);
+ gras_msgtype_declare_rpc("amok_pm_join", msg_join_t,NULL);
gras_msgtype_declare_rpc("amok_pm_leave",
gras_datadesc_by_name("string"),
NULL);
}
static void _amok_pm_join(void *p) {
- /* moddata management */
- amok_pm_moddata_t mod = (amok_pm_moddata_t)p;
+ /* moddata management */
+ amok_pm_moddata_t mod = (amok_pm_moddata_t)p;
- mod->groups = NULL;
-
- mod->done = 0;
- mod->groups = xbt_dict_new();
-
- /* callbacks */
+ mod->groups = NULL;
+
+ mod->done = 0;
+ mod->groups = xbt_dict_new();
+
+ /* callbacks */
gras_cb_register("amok_pm_kill", &amok_pm_cb_kill);
gras_cb_register("amok_pm_killrpc",&amok_pm_cb_killrpc);