X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/be25aefab9c2c610e764c6b3ddd0df1c2c9a4797..7d1dc2ebb532ebc2ce507519cae63cd6b213e881:/examples/gras/pmm/pmm.c diff --git a/examples/gras/pmm/pmm.c b/examples/gras/pmm/pmm.c old mode 100755 new mode 100644 index ecfb020b0f..38c834c4a9 --- a/examples/gras/pmm/pmm.c +++ b/examples/gras/pmm/pmm.c @@ -1,7 +1,7 @@ -/* $Id$ */ /* pmm - parallel matrix multiplication "double diffusion" */ -/* Copyright (c) 2006-2008 The SimGrid team. All rights reserved. */ +/* Copyright (c) 2006, 2007, 2008, 2009, 2010. The SimGrid Team. + * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -20,32 +20,34 @@ const int submatrix_size = DATA_MATRIX_SIZE / PROC_MATRIX_SIZE; XBT_LOG_NEW_DEFAULT_CATEGORY(pmm, "Parallel Matrix Multiplication"); /* struct for recovering results */ -GRAS_DEFINE_TYPE(s_result, struct s_result { - int linepos; - int rowpos; xbt_matrix_t C GRAS_ANNOTE(subtype, double);}); +XBT_DEFINE_TYPE(s_result, struct s_result { + int linepos; int rowpos; + xbt_matrix_t C XBT_ANNOTE(subtype, double); + }); typedef struct s_result result_t; /* struct to send initial data to slave */ -GRAS_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment { +XBT_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment { int linepos; int rowpos; xbt_peer_t line[NEIGHBOR_COUNT]; xbt_peer_t row[NEIGHBOR_COUNT]; - xbt_matrix_t A GRAS_ANNOTE(subtype, double); - xbt_matrix_t B GRAS_ANNOTE(subtype, double);}); + xbt_matrix_t A XBT_ANNOTE(subtype, double); + xbt_matrix_t B XBT_ANNOTE(subtype, double); + }); typedef struct s_pmm_assignment s_pmm_assignment_t; /* register messages which may be sent (common to client and server) */ static void register_messages(void) { - gras_datadesc_type_t result_type; - gras_datadesc_type_t pmm_assignment_type; + xbt_datadesc_type_t result_type; + xbt_datadesc_type_t pmm_assignment_type; - gras_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT); - result_type = gras_datadesc_by_symbol(s_result); - pmm_assignment_type = gras_datadesc_by_symbol(s_pmm_assignment); + xbt_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT); + result_type = xbt_datadesc_by_symbol(s_result); + pmm_assignment_type = xbt_datadesc_by_symbol(s_pmm_assignment); /* receive a final result from slave */ gras_msgtype_declare("result", result_type); @@ -55,11 +57,42 @@ static void register_messages(void) /* send data between slaves */ gras_msgtype_declare("dataA", - gras_datadesc_matrix(gras_datadesc_by_name("double"), - NULL)); + xbt_datadesc_matrix(xbt_datadesc_by_name + ("double"), NULL)); gras_msgtype_declare("dataB", - gras_datadesc_matrix(gras_datadesc_by_name("double"), - NULL)); + xbt_datadesc_matrix(xbt_datadesc_by_name + ("double"), NULL)); + + /* synchronization message */ + gras_msgtype_declare("pmm_sync", 0); +} + +static xbt_socket_t try_gras_socket_client_from_string(const char *host) +{ + volatile xbt_socket_t sock = NULL; + xbt_ex_t e; + TRY { + sock = gras_socket_client_from_string(host); + } + CATCH(e) { + if (e.category != system_error) + /* dunno what happened, let the exception go through */ + RETHROWF("Unable to connect to the server: %s"); + xbt_ex_free(e); + } + return sock; +} + +static void my_gras_msg_wait(double timeout, const char* msgt_want, + xbt_socket_t* expeditor, void *payload, + const char *error_msg) +{ + TRY { + gras_msg_wait(timeout, msgt_want, expeditor, payload); + } + CATCH_ANONYMOUS { + RETHROWF("%s: %s", error_msg); + } } /* Function prototypes */ @@ -78,7 +111,6 @@ typedef struct { int remaining_ack; } master_data_t; - int master(int argc, char *argv[]) { @@ -87,11 +119,11 @@ int master(int argc, char *argv[]) xbt_matrix_t A, B, C; result_t result; - gras_socket_t from; + xbt_socket_t from; xbt_dynar_t peers; /* group of slaves */ xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */ - gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */ + xbt_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */ /* Init the GRAS's infrastructure */ gras_init(&argc, argv); @@ -104,21 +136,25 @@ int master(int argc, char *argv[]) C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE); /* Create the connexions */ - xbt_assert0(argc > 1, "Usage: master "); + xbt_assert(argc > 1, "Usage: master "); gras_socket_server(atoi(argv[1])); peers = amok_pm_group_new("pmm"); /* friends, we're ready. Come and play */ - INFO0("Wait for peers for 5 sec"); - gras_msg_handleall(5); - INFO1("Got %ld pals", xbt_dynar_length(peers)); + XBT_INFO("Wait for peers for 2 sec"); + gras_msg_handleall(2); + while (xbt_dynar_length(peers) < SLAVE_COUNT) { + XBT_INFO("Got only %ld pals (of %d). Wait 2 more seconds", + xbt_dynar_length(peers),SLAVE_COUNT); + gras_msg_handleall(2); + } + XBT_INFO("Good. Got %ld pals", xbt_dynar_length(peers)); for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) { - xbt_dynar_get_cpy(peers, i, &grid[i]); socket[i] = gras_socket_client(grid[i]->name, grid[i]->port); } - xbt_assert2(i == SLAVE_COUNT, + xbt_assert(i == SLAVE_COUNT, "Not enough slaves for this setting (got %d of %d). Change the deployment file", i, SLAVE_COUNT); @@ -127,7 +163,7 @@ int master(int argc, char *argv[]) xbt_peer_t h; xbt_dynar_remove_at(peers, i, &h); - INFO2("Too much slaves. Killing %s:%d", h->name, h->port); + XBT_INFO("Too much slaves. Killing %s:%d", h->name, h->port); amok_pm_kill_hp(h->name, h->port); free(h); } @@ -135,7 +171,7 @@ int master(int argc, char *argv[]) /* Assign job to slaves */ int row = 0, line = 0; - INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication"); + XBT_INFO("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication"); for (i = 0; i < SLAVE_COUNT; i++) { s_pmm_assignment_t assignment; int j, k; @@ -162,8 +198,9 @@ int master(int argc, char *argv[]) submatrix_size * line, submatrix_size * row, NULL); assignment.B = - xbt_matrix_new_sub(B, submatrix_size, submatrix_size, - submatrix_size * line, submatrix_size * row, NULL); + xbt_matrix_new_sub(B, submatrix_size, submatrix_size, + submatrix_size * line, submatrix_size * row, + NULL); row++; if (row >= PROC_MATRIX_SIZE) { row = 0; @@ -175,12 +212,20 @@ int master(int argc, char *argv[]) xbt_matrix_free(assignment.B); } - /* (have a rest while the slave perform the multiplication) */ + /* synchronize slaves */ + for (i = 0; i < PROC_MATRIX_SIZE; i++) { + int j; + for (j = 0; j < SLAVE_COUNT; j++) + gras_msg_wait(600, "pmm_sync", NULL, NULL); + for (j = 0; j < SLAVE_COUNT; j++) + gras_msg_send(socket[j], "pmm_sync", NULL); + } /* Retrieve the results */ for (i = 0; i < SLAVE_COUNT; i++) { gras_msg_wait(6000, "result", &from, &result); - VERB2("%d slaves are done already. Waiting for %d", i + 1, SLAVE_COUNT); + XBT_VERB("%d slaves are done already. Waiting for %d", i + 1, + SLAVE_COUNT); xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size, submatrix_size * result.linepos, submatrix_size * result.rowpos, 0, 0, NULL); @@ -188,19 +233,23 @@ int master(int argc, char *argv[]) } /* end of gather */ - if (DATA_MATRIX_SIZE < 30) { - INFO0("The Result of Multiplication is :"); - xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double); - } else { - INFO1("Matrix size too big (%d>30) to be displayed here", - DATA_MATRIX_SIZE); + if (xbt_matrix_double_is_seq(C)) + XBT_INFO("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations"); + else { + XBT_WARN("the result seems wrong"); + if (DATA_MATRIX_SIZE < 30) { + XBT_INFO("The Result of Multiplication is :"); + xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double); + } else { + XBT_INFO("Matrix size too big (%d>30) to be displayed here", + DATA_MATRIX_SIZE); + } } amok_pm_group_shutdown("pmm"); /* Ok, we're out of here */ - for (i = 0; i < SLAVE_COUNT; i++) { + for (i = 0; i < SLAVE_COUNT; i++) gras_socket_close(socket[i]); - } xbt_matrix_free(A); xbt_matrix_free(B); @@ -217,9 +266,7 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload) { /* Recover my initialized Data and My Position */ s_pmm_assignment_t assignment = *(s_pmm_assignment_t *) payload; - gras_socket_t master = gras_msg_cb_ctx_from(ctx); - - xbt_ex_t e; + xbt_socket_t master = gras_msg_cb_ctx_from(ctx); int step, l; xbt_matrix_t bA = xbt_matrix_new(submatrix_size, submatrix_size, @@ -230,15 +277,15 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload) int myline, myrow; xbt_matrix_t mydataA, mydataB; xbt_matrix_t bC = - xbt_matrix_double_new_zeros(submatrix_size, submatrix_size); + xbt_matrix_double_new_zeros(submatrix_size, submatrix_size); result_t result; - gras_socket_t from; /* to exchange data with my neighbor */ + xbt_socket_t from; /* to exchange data with my neighbor */ /* sockets for brodcast to other slave */ - gras_socket_t socket_line[PROC_MATRIX_SIZE - 1]; - gras_socket_t socket_row[PROC_MATRIX_SIZE - 1]; + xbt_socket_t socket_line[PROC_MATRIX_SIZE - 1]; + xbt_socket_t socket_row[PROC_MATRIX_SIZE - 1]; memset(socket_line, 0, sizeof(socket_line)); memset(socket_row, 0, sizeof(socket_row)); @@ -251,7 +298,10 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload) mydataA = assignment.A; mydataB = assignment.B; - INFO2("Receive my pos (%d,%d) and assignment", myline, myrow); + if (gras_if_RL()) + XBT_INFO("Receive my pos and assignment"); + else + XBT_INFO("Receive my pos (%d,%d) and assignment", myline, myrow); /* Get my neighborhood from the assignment message (skipping myself) */ for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) { @@ -266,12 +316,15 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload) } for (step = 0; step < PROC_MATRIX_SIZE; step++) { + gras_msg_send(master, "pmm_sync", NULL); + gras_msg_wait(600, "pmm_sync", NULL, NULL); /* a line brodcast */ if (myline == step) { - INFO2("LINE: step(%d) = Myline(%d). Broadcast my data.", step, myline); + XBT_VERB("LINE: step(%d) = Myline(%d). Broadcast my data.", step, + myline); for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) { - INFO1("LINE: Send to %s", gras_socket_peer_name(socket_row[l])); + XBT_VERB("LINE: Send to %s", xbt_socket_peer_name(socket_row[l])); gras_msg_send(socket_row[l], "dataB", &mydataB); } @@ -280,41 +333,34 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload) bB = xbt_matrix_new_sub(mydataB, submatrix_size, submatrix_size, 0, 0, NULL); } else { - TRY { - xbt_matrix_free(bB); - gras_msg_wait(600, "dataB", &from, &bB); - } - CATCH(e) { - RETHROW0("Can't get a data message from line : %s"); - } - INFO3("LINE: step(%d) <> Myline(%d). Receive data from %s", step, - myline, gras_socket_peer_name(from)); + xbt_matrix_free(bB); + my_gras_msg_wait(600, "dataB", &from, &bB, + "Can't get a data message from line"); + XBT_VERB("LINE: step(%d) <> Myline(%d). Receive data from %s", step, + myline, xbt_socket_peer_name(from)); } /* a row brodcast */ if (myrow == step) { - INFO2("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow); + XBT_VERB("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow); for (l = 1; l < PROC_MATRIX_SIZE; l++) { - INFO1("ROW: Send to %s", gras_socket_peer_name(socket_line[l - 1])); + XBT_VERB("ROW: Send to %s", + xbt_socket_peer_name(socket_line[l - 1])); gras_msg_send(socket_line[l - 1], "dataA", &mydataA); } xbt_matrix_free(bA); bA = xbt_matrix_new_sub(mydataA, submatrix_size, submatrix_size, 0, 0, NULL); } else { - TRY { - xbt_matrix_free(bA); - gras_msg_wait(1200, "dataA", &from, &bA); - } - CATCH(e) { - RETHROW0("Can't get a data message from row : %s"); - } - INFO3("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow, - gras_socket_peer_name(from)); + xbt_matrix_free(bA); + my_gras_msg_wait(1200, "dataA", &from, &bA, + "Can't get a data message from row"); + XBT_VERB("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow, + xbt_socket_peer_name(from)); } xbt_matrix_double_addmult(bA, bB, bC); - }; + } /* send Result to master */ result.C = bC; @@ -324,11 +370,11 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload) TRY { gras_msg_send(master, "result", &result); } - CATCH(e) { - RETHROW0("Failed to send answer to server: %s"); + CATCH_ANONYMOUS { + RETHROWF("Failed to send answer to server: %s"); } - INFO2(">>>>>>>> Result sent to %s:%d <<<<<<<<", - gras_socket_peer_name(master), gras_socket_peer_port(master)); + XBT_VERB(">>>>>>>> Result sent to %s:%d <<<<<<<<", + xbt_socket_peer_name(master), xbt_socket_peer_port(master)); /* Free the allocated resources, and shut GRAS down */ xbt_matrix_free(bA); @@ -352,9 +398,8 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload) int slave(int argc, char *argv[]) { - gras_socket_t mysock; - gras_socket_t master = NULL; - int connected = 0; + xbt_socket_t mysock; + xbt_socket_t master = NULL; int rank; /* Init the GRAS's infrastructure */ @@ -373,23 +418,12 @@ int slave(int argc, char *argv[]) /* Create the connexions */ mysock = gras_socket_server_range(3000, 9999, 0, 0); - INFO1("Sensor %d starting", rank); - while (!connected) { - xbt_ex_t e; - TRY { - master = gras_socket_client_from_string(argv[1]); - connected = 1; - } - CATCH(e) { - if (e.category != system_error) - RETHROW; - xbt_ex_free(e); - gras_os_sleep(0.5); - } - } + XBT_INFO("Sensor %d starting", rank); + while (!(master = try_gras_socket_client_from_string(argv[1]))) + gras_os_sleep(0.5); /* Join and run the group */ - amok_pm_group_join(master, "pmm", rank); + rank = amok_pm_group_join(master, "pmm"); amok_pm_mainloop(600); /* housekeeping */