/* $Id$ */
/* pmm - parallel matrix multiplication "double diffusion" */
-/* Copyright (c) 2006 Ahmed Harbaoui. */
-/* Copyright (c) 2006 Martin Quinson. */
-/* All rights reserved. */
+/* Copyright (c) 2006-2008 The SimGrid team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
/* a line brodcast */
if(myline==step){
- INFO3("LINE: step(%d) = Myline(%d). Broadcast my data (myport=%d).",
- step,myline,gras_os_myport());
+ INFO2("LINE: step(%d) = Myline(%d). Broadcast my data.",
+ step,myline);
for (l=0;l < PROC_MATRIX_SIZE-1 ;l++) {
- INFO2("LINE: Send to %s:%d",
- gras_socket_peer_name(socket_row[l]),
- gras_socket_peer_port(socket_row[l]));
+ INFO1("LINE: Send to %s",
+ gras_socket_peer_name(socket_row[l]));
gras_msg_send(socket_row[l], "dataB", &mydataB);
}
} CATCH(e) {
RETHROW0("Can't get a data message from line : %s");
}
- INFO4("LINE: step(%d) <> Myline(%d). Receive data from %s:%d",step,myline,
- gras_socket_peer_name(from), gras_socket_peer_port(from));
+ INFO3("LINE: step(%d) <> Myline(%d). Receive data from %s",step,myline,
+ gras_socket_peer_name(from));
}
/* a row brodcast */
if (myrow==step) {
- INFO2("ROW: step(%d)=myrow(%d). Broadcast my data",step,myrow);
+ INFO2("ROW: step(%d)=myrow(%d). Broadcast my data.",step,myrow);
for (l=1;l < PROC_MATRIX_SIZE ; l++) {
- INFO2("ROW: Send to %s:%d",
- gras_socket_peer_name(socket_line[l-1]),
- gras_socket_peer_port(socket_line[l-1]));
+ INFO1("ROW: Send to %s",
+ gras_socket_peer_name(socket_line[l-1]));
gras_msg_send(socket_line[l-1],"dataA", &mydataA);
}
xbt_matrix_free(bA);
gras_socket_t mysock;
gras_socket_t master = NULL;
int connected = 0;
+ int rank;
/* Init the GRAS's infrastructure */
gras_init(&argc, argv);
amok_pm_init();
+ if (argc != 3 && argc !=2)
+ xbt_die("Usage: slave masterhost:masterport [rank]");
+ if (argc == 2)
+ rank = -1;
+ else
+ rank = atoi(argv[2]);
/* Register the known messages and my callback */
register_messages();
/* Create the connexions */
mysock = gras_socket_server_range(3000,9999,0,0);
- INFO1("Sensor starting (on port %d)",gras_os_myport());
+ INFO1("Sensor %d starting",rank);
while (!connected) {
xbt_ex_t e;
TRY {
}
/* Join and run the group */
- amok_pm_group_join(master,"pmm");
+ amok_pm_group_join(master,"pmm",rank);
amok_pm_mainloop(600);
/* housekeeping */
xbt_dict_t groups;
} s_amok_pm_moddata_t, *amok_pm_moddata_t;
+/* Payload of join message */
+typedef struct {
+ char *group;
+ int rank;
+} s_amok_pm_msg_join_t,*amok_pm_msg_join_t;
/* Message callbacks */
static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx,
}
static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload) {
amok_pm_moddata_t g=gras_moddata_by_id(amok_pm_moddata_id);
- char *name = *(void**)payload;
- xbt_dynar_t group = xbt_dict_get(g->groups, name);
+ amok_pm_msg_join_t msg = *(amok_pm_msg_join_t*)payload;
+ xbt_dynar_t group = xbt_dict_get(g->groups, msg->group);
gras_socket_t exp = gras_msg_cb_ctx_from(ctx);
xbt_peer_t dude = xbt_peer_new(gras_socket_peer_name(exp),
gras_socket_peer_port(exp));
-
- VERB2("Contacted by %s:%d",dude->name,dude->port);
- xbt_dynar_push(group,&dude);
-
+ xbt_peer_t previous = NULL;
+
+ if (msg->rank >= 0 && xbt_dynar_length(group) >= msg->rank +1)
+ xbt_dynar_get_cpy(group,msg->rank,&previous);
+
+ VERB3("Contacted by %s:%d for rank %d",dude->name,dude->port,msg->rank);
+ if (msg->rank < 0) {
+ xbt_dynar_push(group,&dude);
+ } else {
+ if (previous)
+ THROW4(arg_error,0,"You wanted to get rank %d of group %s, but %s:%d is already there",
+ msg->rank, msg->group, previous->name, previous->port);
+ xbt_dynar_set(group,msg->rank,&dude);
+ }
+
gras_msg_rpcreturn(10, ctx, NULL);
- free(name);
+ free(msg->group);
+ free(msg);
return 0;
}
static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload) {
return res;
}
-/** \brief add current peer to the given remote group */
-void amok_pm_group_join(gras_socket_t master, const char *group_name) {
- VERB3("Join group '%s' on %s:%d",
- group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
- gras_msg_rpccall(master,30,"amok_pm_join", &group_name,NULL);
+/** \brief add current peer to the given remote group
+ *
+ * You should provide rank only when you want to force the order of members
+ * (for example in strict test cases). Give -1 to leave the system choose it for you.
+ */
+void amok_pm_group_join(gras_socket_t master, const char *group_name,int rank) {
+ amok_pm_msg_join_t msg = xbt_new(s_amok_pm_msg_join_t,1);
+ msg->group = (char*)group_name;
+ msg->rank = rank;
+ VERB4("Join group '%s' on %s:%d (at rank %d)",
+ group_name,gras_socket_peer_name(master),gras_socket_peer_port(master),rank);
+ gras_msg_rpccall(master,30,"amok_pm_join", &msg,NULL);
+ free(msg);
VERB3("Joined group '%s' on %s:%d",
group_name,gras_socket_peer_name(master),gras_socket_peer_port(master));
}
/* Datatype and message declarations */
gras_datadesc_type_t pm_group_type = gras_datadesc_dynar(gras_datadesc_by_name("xbt_peer_t"), xbt_peer_free_voidp);
+ gras_datadesc_type_t msg_join_t = gras_datadesc_struct("s_amok_pm_moddata_t");
+ gras_datadesc_struct_append(msg_join_t,"group", gras_datadesc_by_name("string"));
+ gras_datadesc_struct_append(msg_join_t,"rank", gras_datadesc_by_name("int"));
+ gras_datadesc_struct_close(msg_join_t);
+ msg_join_t=gras_datadesc_ref("amok_pm_moddata_t", gras_datadesc_by_name("s_amok_pm_moddata_t"));
+
gras_msgtype_declare("amok_pm_kill",NULL);
gras_msgtype_declare_rpc("amok_pm_killrpc",NULL,NULL);
gras_msgtype_declare_rpc("amok_pm_get",
gras_datadesc_by_name("string"),
pm_group_type);
- gras_msgtype_declare_rpc("amok_pm_join",
- gras_datadesc_by_name("string"),
- NULL);
+ gras_msgtype_declare_rpc("amok_pm_join", msg_join_t,NULL);
gras_msgtype_declare_rpc("amok_pm_leave",
gras_datadesc_by_name("string"),
NULL);
}
static void _amok_pm_join(void *p) {
- /* moddata management */
- amok_pm_moddata_t mod = (amok_pm_moddata_t)p;
+ /* moddata management */
+ amok_pm_moddata_t mod = (amok_pm_moddata_t)p;
- mod->groups = NULL;
-
- mod->done = 0;
- mod->groups = xbt_dict_new();
-
- /* callbacks */
+ mod->groups = NULL;
+
+ mod->done = 0;
+ mod->groups = xbt_dict_new();
+
+ /* callbacks */
gras_cb_register("amok_pm_kill", &amok_pm_cb_kill);
gras_cb_register("amok_pm_killrpc",&amok_pm_cb_killrpc);