#define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
#define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
-#define DATA_MATRIX_SIZE 9
+#define DATA_MATRIX_SIZE 18
const int submatrix_size = DATA_MATRIX_SIZE/PROC_MATRIX_SIZE;
XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication");
C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE);
/* Create the connexions */
+ xbt_assert0(argc>1, "Usage: master <port>");
gras_socket_server(atoi(argv[1]));
peers=amok_pm_group_new("pmm");
- INFO0("Wait for peers for 10 sec");
- gras_msg_handleall(10); /* friends, we're ready. Come and play */
+
+ /* friends, we're ready. Come and play */
+ INFO0("Wait for peers for 5 sec");
+ gras_msg_handleall(5);
INFO1("Got %ld pals",xbt_dynar_length(peers));
for (i=0;
xbt_dynar_get_cpy(peers,i,&grid[i]);
socket[i]=gras_socket_client(grid[i]->name,grid[i]->port);
- INFO2("Connected to %s:%d.",grid[i]->name,grid[i]->port);
}
xbt_assert2(i==SLAVE_COUNT,
"Not enough slaves for this setting (got %d of %d). Change the deployment file",
for (i=SLAVE_COUNT; i<xbt_dynar_length(peers); ) {
xbt_peer_t h;
- xbt_dynar_get_cpy(peers,i,&h);
+ xbt_dynar_remove_at(peers,i,&h);
+ INFO2("Too much slaves. Killing %s:%d", h->name, h->port);
amok_pm_kill_hp(h->name,h->port);
free(h);
}
line++;
}
- gras_msg_send(socket[i],gras_msgtype_by_name("pmm_slave"),&assignment);
+ gras_msg_send(socket[i],"pmm_slave",&assignment);
xbt_matrix_free(assignment.A);
xbt_matrix_free(assignment.B);
}
/* Retrieve the results */
for( i=0;i< SLAVE_COUNT;i++){
- gras_msg_wait(6000,gras_msgtype_by_name("result"),&from,&result);
+ gras_msg_wait(6000,"result",&from,&result);
VERB2("%d slaves are done already. Waiting for %d",i+1, SLAVE_COUNT);
xbt_matrix_copy_values(C,result.C, submatrix_size,submatrix_size,
submatrix_size*result.linepos,
INFO2("LINE: Send to %s:%d",
gras_socket_peer_name(socket_row[l]),
gras_socket_peer_port(socket_row[l]));
- gras_msg_send(socket_row[l],
- gras_msgtype_by_name("dataB"),
- &mydataB);
+ gras_msg_send(socket_row[l], "dataB", &mydataB);
}
} else {
TRY {
xbt_matrix_free(bB);
- gras_msg_wait(600,gras_msgtype_by_name("dataB"),&from,&bB);
+ gras_msg_wait(600,"dataB",&from,&bB);
} CATCH(e) {
RETHROW0("Can't get a data message from line : %s");
}
INFO2("ROW: Send to %s:%d",
gras_socket_peer_name(socket_line[l-1]),
gras_socket_peer_port(socket_line[l-1]));
- gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA);
+ gras_msg_send(socket_line[l-1],"dataA", &mydataA);
}
xbt_matrix_free(bA);
bA = xbt_matrix_new_sub(mydataA,
} else {
TRY {
xbt_matrix_free(bA);
- gras_msg_wait(1200,gras_msgtype_by_name("dataA"), &from,&bA);
+ gras_msg_wait(1200,"dataA", &from,&bA);
} CATCH(e) {
RETHROW0("Can't get a data message from row : %s");
}
result.rowpos=myrow;
TRY {
- gras_msg_send(master, gras_msgtype_by_name("result"),&result);
+ gras_msg_send(master, "result",&result);
} CATCH(e) {
RETHROW0("Failed to send answer to server: %s");
}
xbt_matrix_free(mydataA);
xbt_matrix_free(mydataB);
+ /* FIXME: some are said to be unknown
gras_socket_close(master);
gras_socket_close(from);
- /* FIXME: some are said to be unknown
for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
if (socket_line[l])
gras_socket_close(socket_line[l]);
gras_socket_close(socket_row[l]);
}*/
- return 1;
+ return 0;
}
int slave(int argc,char *argv[]) {
gras_socket_t mysock;
- gras_socket_t master;
+ gras_socket_t master = NULL;
+ int connected = 0;
/* Init the GRAS's infrastructure */
gras_init(&argc, argv);
/* Register the known messages and my callback */
register_messages();
- gras_cb_register(gras_msgtype_by_name("pmm_slave"),pmm_worker_cb);
+ gras_cb_register("pmm_slave",pmm_worker_cb);
/* Create the connexions */
mysock = gras_socket_server_range(3000,9999,0,0);
- INFO1("Sensor starting (on port %d)",gras_os_myport());
- gras_os_sleep(2); /* let the master get ready */
- master = gras_socket_client_from_string(argv[1]);
+ INFO1("Sensor starting (on port %d)",gras_os_myport());
+ while (!connected) {
+ xbt_ex_t e;
+ TRY {
+ master = gras_socket_client_from_string(argv[1]);
+ connected = 1;
+ } CATCH(e) {
+ if (e.category != system_error)
+ RETHROW;
+ xbt_ex_free(e);
+ gras_os_sleep(0.5);
+ }
+ }
/* Join and run the group */
amok_pm_group_join(master,"pmm");