X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/85df79fc9b50c1121f01b0a1adc4879f4622aaf5..7d1dc2ebb532ebc2ce507519cae63cd6b213e881:/examples/gras/pmm/pmm.c diff --git a/examples/gras/pmm/pmm.c b/examples/gras/pmm/pmm.c old mode 100755 new mode 100644 index be2cdbf492..38c834c4a9 --- a/examples/gras/pmm/pmm.c +++ b/examples/gras/pmm/pmm.c @@ -1,66 +1,103 @@ -/* $Id$ */ /* pmm - parallel matrix multiplication "double diffusion" */ -/* Copyright (c) 2006- Ahmed Harbaoui. All rights reserved. */ +/* Copyright (c) 2006, 2007, 2008, 2009, 2010. The SimGrid Team. + * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ #include "gras.h" +#include "xbt/matrix.h" +#include "amok/peermanagement.h" + #define PROC_MATRIX_SIZE 3 +#define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE) -#define DATA_MATRIX_SIZE 3 - -XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication"); +#define DATA_MATRIX_SIZE 18 +const int submatrix_size = DATA_MATRIX_SIZE / PROC_MATRIX_SIZE; -GRAS_DEFINE_TYPE(s_matrix,struct s_matrix { - int lines; - int rows; - double *data GRAS_ANNOTE(size, lines*rows); -};) -typedef struct s_matrix matrix_t; +XBT_LOG_NEW_DEFAULT_CATEGORY(pmm, "Parallel Matrix Multiplication"); /* struct for recovering results */ -GRAS_DEFINE_TYPE(s_result,struct s_result { - int i; - int j; - double value; -}); +XBT_DEFINE_TYPE(s_result, struct s_result { + int linepos; int rowpos; + xbt_matrix_t C XBT_ANNOTE(subtype, double); + }); + typedef struct s_result result_t; /* struct to send initial data to slave */ -GRAS_DEFINE_TYPE(s_init_data,struct s_init_data { - int linepos; - int rowpos; - xbt_host_t line[PROC_MATRIX_SIZE]; - xbt_host_t row[PROC_MATRIX_SIZE]; - double a; - double b; -}); -typedef struct s_init_data init_data_t; +XBT_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment { + int linepos; + int rowpos; + xbt_peer_t line[NEIGHBOR_COUNT]; + xbt_peer_t row[NEIGHBOR_COUNT]; + xbt_matrix_t A XBT_ANNOTE(subtype, double); + xbt_matrix_t B XBT_ANNOTE(subtype, double); + }); + +typedef struct s_pmm_assignment s_pmm_assignment_t; /* register messages which may be sent (common to client and server) */ -static void register_messages(void) { - gras_datadesc_type_t result_type; - gras_datadesc_type_t init_data_type; - gras_datadesc_set_const("PROC_MATRIX_SIZE",PROC_MATRIX_SIZE); - result_type=gras_datadesc_by_symbol(s_result); - init_data_type=gras_datadesc_by_symbol(s_init_data); - - gras_msgtype_declare("result", result_type); // receive a final result from slave - gras_msgtype_declare("init_data", init_data_type); // send from master to slave to initialize data bA,bB - - gras_msgtype_declare("ask_result", gras_datadesc_by_name("int")); // send from master to slave to ask a final result - gras_msgtype_declare("step", gras_datadesc_by_name("int"));// send from master to slave to indicate the begining of step - gras_msgtype_declare("step_ack", gras_datadesc_by_name("int"));//send from slave to master to indicate the end of the current step - gras_msgtype_declare("dataA", gras_datadesc_by_name("double"));// send data between slave - gras_msgtype_declare("dataB", gras_datadesc_by_name("double"));// send data between slave +static void register_messages(void) +{ + xbt_datadesc_type_t result_type; + xbt_datadesc_type_t pmm_assignment_type; + + xbt_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT); + result_type = xbt_datadesc_by_symbol(s_result); + pmm_assignment_type = xbt_datadesc_by_symbol(s_pmm_assignment); + + /* receive a final result from slave */ + gras_msgtype_declare("result", result_type); + + /* send from master to slave to assign a position and some data */ + gras_msgtype_declare("pmm_slave", pmm_assignment_type); + + /* send data between slaves */ + gras_msgtype_declare("dataA", + xbt_datadesc_matrix(xbt_datadesc_by_name + ("double"), NULL)); + gras_msgtype_declare("dataB", + xbt_datadesc_matrix(xbt_datadesc_by_name + ("double"), NULL)); + + /* synchronization message */ + gras_msgtype_declare("pmm_sync", 0); +} + +static xbt_socket_t try_gras_socket_client_from_string(const char *host) +{ + volatile xbt_socket_t sock = NULL; + xbt_ex_t e; + TRY { + sock = gras_socket_client_from_string(host); + } + CATCH(e) { + if (e.category != system_error) + /* dunno what happened, let the exception go through */ + RETHROWF("Unable to connect to the server: %s"); + xbt_ex_free(e); + } + return sock; +} + +static void my_gras_msg_wait(double timeout, const char* msgt_want, + xbt_socket_t* expeditor, void *payload, + const char *error_msg) +{ + TRY { + gras_msg_wait(timeout, msgt_want, expeditor, payload); + } + CATCH_ANONYMOUS { + RETHROWF("%s: %s", error_msg); + } } /* Function prototypes */ -int slave (int argc,char *argv[]); -int master (int argc,char *argv[]); +int slave(int argc, char *argv[]); +int master(int argc, char *argv[]); /* ********************************************************************** @@ -69,360 +106,329 @@ int master (int argc,char *argv[]); /* Global private data */ typedef struct { - int nbr_row,nbr_line; + int nbr_row, nbr_line; int remaining_step; int remaining_ack; } master_data_t; +int master(int argc, char *argv[]) +{ -/*** Function initilaze matrixs ***/ - -static void initmatrix(matrix_t *X){ int i; - for(i=0 ; i<(X->lines)*(X->rows); i++) - X->data[i]=1.0;//*rand()/(RAND_MAX+1.0); -} /* end_of_initmatrixs */ - -/*** Function Scatter Sequentiel ***/ - -static void scatter(){ - -}/* end_of_Scatter */ - -/*** Function: Scatter // ***/ - -static void scatter_parl(){ - -}/* end_of_Scatter // */ - -/*** Function: multiplication ***/ - -static void multiplication(){ - -}/* end_of_multiplication */ - -/*** Function: gather ***/ - -static void gather(){ - -}/* end_of_gather */ + xbt_matrix_t A, B, C; + result_t result; -/*** Function: Display Matrix ***/ + xbt_socket_t from; -static void display(matrix_t X){ - - int i,j,t=0; + xbt_dynar_t peers; /* group of slaves */ + xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */ + xbt_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */ - printf(" "); - for(j=0;j 1, "Usage: master "); + gras_socket_server(atoi(argv[1])); + peers = amok_pm_group_new("pmm"); + + /* friends, we're ready. Come and play */ + XBT_INFO("Wait for peers for 2 sec"); + gras_msg_handleall(2); + while (xbt_dynar_length(peers) < SLAVE_COUNT) { + XBT_INFO("Got only %ld pals (of %d). Wait 2 more seconds", + xbt_dynar_length(peers),SLAVE_COUNT); + gras_msg_handleall(2); } - printf(" --"); - for(j=0;jname,grid[i-1]->port); - - INFO2("Connected to %s:%d.",grid[i-1]->name,grid[i-1]->port); + for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) { + xbt_dynar_get_cpy(peers, i, &grid[i]); + socket[i] = gras_socket_client(grid[i]->name, grid[i]->port); + } + xbt_assert(i == SLAVE_COUNT, + "Not enough slaves for this setting (got %d of %d). Change the deployment file", + i, SLAVE_COUNT); + + /* Kill surnumerous slaves */ + for (i = SLAVE_COUNT; i < xbt_dynar_length(peers);) { + xbt_peer_t h; + + xbt_dynar_remove_at(peers, i, &h); + XBT_INFO("Too much slaves. Killing %s:%d", h->name, h->port); + amok_pm_kill_hp(h->name, h->port); + free(h); } - int row=1, line=1,j; - for(i=0 ; i PROC_MATRIX_SIZE) { - row=1; + if (row >= PROC_MATRIX_SIZE) { + row = 0; line++; } - - mydata.a=A.data[(line-1)*PROC_MATRIX_SIZE+(row-1)]; - mydata.b=B.data[(line-1)*PROC_MATRIX_SIZE+(row-1)];; - - gras_msg_send(socket[i],gras_msgtype_by_name("init_data"),&mydata); - INFO3("Send Init Data to %s : data A= %.3g & data B= %.3g", - gras_socket_peer_name(socket[i]),mydata.a,mydata.b); + gras_msg_send(socket[i], "pmm_slave", &assignment); + xbt_matrix_free(assignment.A); + xbt_matrix_free(assignment.B); } - // end init Data Send - - /******************************* multiplication ********************************/ - INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication"); - - for (step=1; step <= PROC_MATRIX_SIZE; step++){ - // gras_os_sleep(50); - for (i=0; i< SLAVE_COUNT; i++){ - TRY { - gras_msg_send(socket[i], gras_msgtype_by_name("step"), &step); - } CATCH(e) { - gras_socket_close(socket[i]); - RETHROW0("Unable to send the msg : %s"); - } - } - INFO1("send to slave to begin a %d th step",step); - /* wait for computing and slave messages exchange */ - i=0; - - while ( i< SLAVE_COUNT){ - TRY { - gras_msg_wait(1300,gras_msgtype_by_name("step_ack"),&from,&step_ack); - } CATCH(e) { - RETHROW0("I Can't get a Ack step message from slave : %s"); - } - i++; - INFO3("Receive Ack step ack from %s (got %d of %d)", - gras_socket_peer_name(from), - i, SLAVE_COUNT); - } - } - /********************************* gather ***************************************/ - ask_result=0; - for( i=1;i< argc;i++){ - gras_msg_send(socket[i],gras_msgtype_by_name("ask_result"),&ask_result); - INFO1("Send (Ask Result) message to %s",gras_socket_peer_name(socket[i])); + /* synchronize slaves */ + for (i = 0; i < PROC_MATRIX_SIZE; i++) { + int j; + for (j = 0; j < SLAVE_COUNT; j++) + gras_msg_wait(600, "pmm_sync", NULL, NULL); + for (j = 0; j < SLAVE_COUNT; j++) + gras_msg_send(socket[j], "pmm_sync", NULL); } - /* wait for results */ - for( i=1;i< argc;i++){ - gras_msg_wait(600,gras_msgtype_by_name("result"),&from,&result); - C.data[(result.i-1)*DATA_MATRIX_SIZE+(result.j-1)]=result.value; + + /* Retrieve the results */ + for (i = 0; i < SLAVE_COUNT; i++) { + gras_msg_wait(6000, "result", &from, &result); + XBT_VERB("%d slaves are done already. Waiting for %d", i + 1, + SLAVE_COUNT); + xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size, + submatrix_size * result.linepos, + submatrix_size * result.rowpos, 0, 0, NULL); + xbt_matrix_free(result.C); } /* end of gather */ - INFO0 ("The Result of Multiplication is :"); - display(C); + if (xbt_matrix_double_is_seq(C)) + XBT_INFO("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations"); + else { + XBT_WARN("the result seems wrong"); + if (DATA_MATRIX_SIZE < 30) { + XBT_INFO("The Result of Multiplication is :"); + xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double); + } else { + XBT_INFO("Matrix size too big (%d>30) to be displayed here", + DATA_MATRIX_SIZE); + } + } + + amok_pm_group_shutdown("pmm"); /* Ok, we're out of here */ + + for (i = 0; i < SLAVE_COUNT; i++) + gras_socket_close(socket[i]); + + xbt_matrix_free(A); + xbt_matrix_free(B); + xbt_matrix_free(C); + gras_exit(); return 0; -} /* end_of_master */ +} /* end_of_master */ /* ********************************************************************** * slave code * **********************************************************************/ -int slave(int argc,char *argv[]) { +static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload) +{ + /* Recover my initialized Data and My Position */ + s_pmm_assignment_t assignment = *(s_pmm_assignment_t *) payload; + xbt_socket_t master = gras_msg_cb_ctx_from(ctx); - xbt_ex_t e; + int step, l; + xbt_matrix_t bA = xbt_matrix_new(submatrix_size, submatrix_size, + sizeof(double), NULL); + xbt_matrix_t bB = xbt_matrix_new(submatrix_size, submatrix_size, + sizeof(double), NULL); - int step,port,l,result_ack=0; - double bA,bB; - - int myline,myrow; - double mydataA,mydataB; - double bC=0; - - // static end_step; + int myline, myrow; + xbt_matrix_t mydataA, mydataB; + xbt_matrix_t bC = + xbt_matrix_double_new_zeros(submatrix_size, submatrix_size); result_t result; - - gras_socket_t from,sock; /* to receive from server for steps */ - /* sockets for brodcast to other slave */ - gras_socket_t socket_line[PROC_MATRIX_SIZE-1]; - gras_socket_t socket_row[PROC_MATRIX_SIZE-1]; + xbt_socket_t from; /* to exchange data with my neighbor */ - /* Init the GRAS's infrastructure */ + /* sockets for brodcast to other slave */ + xbt_socket_t socket_line[PROC_MATRIX_SIZE - 1]; + xbt_socket_t socket_row[PROC_MATRIX_SIZE - 1]; + memset(socket_line, 0, sizeof(socket_line)); + memset(socket_row, 0, sizeof(socket_row)); - gras_init(&argc, argv); + int i; - /* Get arguments and create sockets */ + gras_os_sleep(1); /* wait for my pals */ - port=atoi(argv[1]); - - /* Create my master socket */ - sock = gras_socket_server(port); - int i; + myline = assignment.linepos; + myrow = assignment.rowpos; + mydataA = assignment.A; + mydataB = assignment.B; - /* Register the known messages */ - register_messages(); + if (gras_if_RL()) + XBT_INFO("Receive my pos and assignment"); + else + XBT_INFO("Receive my pos (%d,%d) and assignment", myline, myrow); - /* Recover my initialized Data and My Position*/ - init_data_t mydata; - INFO2("Launch %s (port=%d); wait for my enrole message",argv[0],port); - TRY { - gras_msg_wait(600,gras_msgtype_by_name("init_data"),&from,&mydata); - } CATCH(e) { - RETHROW0("Can't get a init Data message from master : %s"); + /* Get my neighborhood from the assignment message (skipping myself) */ + for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) { + socket_line[i] = gras_socket_client(assignment.line[i]->name, + assignment.line[i]->port); + xbt_peer_free(assignment.line[i]); } - myline=mydata.linepos; - myrow=mydata.rowpos; - mydataA=mydata.a; - mydataB=mydata.b; - INFO4("Receive MY POSITION (%d,%d) and MY INIT DATA ( A=%.3g | B=%.3g )", - myline,myrow,mydataA,mydataB); - - /* Get my neighborhood from the enrollment message */ - int j=0; - for (i=0,j=0 ; iname)) { - socket_line[j]=gras_socket_client(mydata.line[i]->name,mydata.line[i]->port); - j++; - //INFO3("Line neighbour %d: %s:%d",j,mydata.line[i]->name,mydata.line[i]->port); - } - xbt_host_free(mydata.line[i]); - } - for (i=0,j=0 ; iname)) { - socket_row[j]=gras_socket_client(mydata.row[i]->name,mydata.row[i]->port); - //INFO3("Row neighbour %d : %s:%d",j,mydata.row[i]->name,mydata.row[i]->port); - j++; - } - xbt_host_free(mydata.row[i]); + for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) { + socket_row[i] = gras_socket_client(assignment.row[i]->name, + assignment.row[i]->port); + xbt_peer_free(assignment.row[i]); } - step=1; - - do { //repeat until compute Cb - step=PROC_MATRIX_SIZE+1; // just intilization for loop - - TRY { - gras_msg_wait(200,gras_msgtype_by_name("step"),&from,&step); - } CATCH(e) { - RETHROW0("Can't get a Next Step message from master : %s"); - } - INFO1("Receive a step message from master: step = %d ",step); - - if (step < PROC_MATRIX_SIZE ){ - /* a line brodcast */ - gras_os_sleep(3); // IL FAUT EXPRIMER LE TEMPS D'ATTENTE EN FONCTION DE "SLAVE_COUNT" - if(myline==step){ - INFO2("step(%d) = Myline(%d)",step,myline); - for (l=1;l < PROC_MATRIX_SIZE ;l++){ - gras_msg_send(socket_row[l-1], gras_msgtype_by_name("dataB"), &mydataB); - bB=mydataB; - INFO1("send my data B (%.3g) to my (vertical) neighbors",bB); - } - } - if(myline != step){ - INFO2("step(%d) <> Myline(%d)",step,myline); - TRY { - gras_msg_wait(600,gras_msgtype_by_name("dataB"), - &from,&bB); - } CATCH(e) { - RETHROW0("I Can't get a data message from line : %s"); - } - INFO2("Receive data B (%.3g) from my neighbor: %s",bB,gras_socket_peer_name(from)); - } - /* a row brodcast */ - if(myrow==step){ - for (l=1;l < PROC_MATRIX_SIZE ;l++){ - gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA); - bA=mydataA; - INFO1("send my data A (%.3g) to my (horizontal) neighbors",bA); - } + for (step = 0; step < PROC_MATRIX_SIZE; step++) { + gras_msg_send(master, "pmm_sync", NULL); + gras_msg_wait(600, "pmm_sync", NULL, NULL); + + /* a line brodcast */ + if (myline == step) { + XBT_VERB("LINE: step(%d) = Myline(%d). Broadcast my data.", step, + myline); + for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) { + XBT_VERB("LINE: Send to %s", xbt_socket_peer_name(socket_row[l])); + gras_msg_send(socket_row[l], "dataB", &mydataB); } - if(myrow != step){ - TRY { - gras_msg_wait(1200,gras_msgtype_by_name("dataA"), - &from,&bA); - } CATCH(e) { - RETHROW0("I Can't get a data message from row : %s"); - } - INFO2("Receive data A (%.3g) from my neighbor : %s ",bA,gras_socket_peer_name(from)); + + xbt_matrix_free(bB); + bB = xbt_matrix_new_sub(mydataB, + submatrix_size, submatrix_size, 0, 0, NULL); + } else { + xbt_matrix_free(bB); + my_gras_msg_wait(600, "dataB", &from, &bB, + "Can't get a data message from line"); + XBT_VERB("LINE: step(%d) <> Myline(%d). Receive data from %s", step, + myline, xbt_socket_peer_name(from)); + } + + /* a row brodcast */ + if (myrow == step) { + XBT_VERB("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow); + for (l = 1; l < PROC_MATRIX_SIZE; l++) { + XBT_VERB("ROW: Send to %s", + xbt_socket_peer_name(socket_line[l - 1])); + gras_msg_send(socket_line[l - 1], "dataA", &mydataA); } - bC+=bA*bB; - INFO1(">>>>>>>> My BC = %.3g",bC); - - /* send a ack msg to master */ - - gras_msg_send(from,gras_msgtype_by_name("step_ack"),&step); - - INFO1("Send ack to master for to end %d th step",step); + xbt_matrix_free(bA); + bA = xbt_matrix_new_sub(mydataA, + submatrix_size, submatrix_size, 0, 0, NULL); + } else { + xbt_matrix_free(bA); + my_gras_msg_wait(1200, "dataA", &from, &bA, + "Can't get a data message from row"); + XBT_VERB("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow, + xbt_socket_peer_name(from)); } - if(step==PROC_MATRIX_SIZE-1) break; - - } while (step < PROC_MATRIX_SIZE); - /* wait Message from master to send the result */ - - result.value=bC; - result.i=myline; - result.j=myrow; - - TRY { - gras_msg_wait(600,gras_msgtype_by_name("ask_result"), - &from,&result_ack); - } CATCH(e) { - RETHROW0("I Can't get a data message from line : %s"); + xbt_matrix_double_addmult(bA, bB, bC); + } + /* send Result to master */ + result.C = bC; + result.linepos = myline; + result.rowpos = myrow; + TRY { - gras_msg_send(from, gras_msgtype_by_name("result"),&result); - } CATCH(e) { - // gras_socket_close(from); - RETHROW0("Failed to send PING to server: %s"); + gras_msg_send(master, "result", &result); + } + CATCH_ANONYMOUS { + RETHROWF("Failed to send answer to server: %s"); } - INFO3(">>>>>>>> Result: %.3f sent to %s:%d <<<<<<<<", - bC, - gras_socket_peer_name(from),gras_socket_peer_port(from)); + XBT_VERB(">>>>>>>> Result sent to %s:%d <<<<<<<<", + xbt_socket_peer_name(master), xbt_socket_peer_port(master)); /* Free the allocated resources, and shut GRAS down */ - gras_socket_close(from); + + xbt_matrix_free(bA); + xbt_matrix_free(bB); + xbt_matrix_free(bC); + + xbt_matrix_free(mydataA); + xbt_matrix_free(mydataB); + /* FIXME: some are said to be unknown + gras_socket_close(master); + gras_socket_close(from); + for (l=0; l < PROC_MATRIX_SIZE-1; l++) { + if (socket_line[l]) + gras_socket_close(socket_line[l]); + if (socket_row[l]) + gras_socket_close(socket_row[l]); + } */ + + return 0; +} + +int slave(int argc, char *argv[]) +{ + xbt_socket_t mysock; + xbt_socket_t master = NULL; + int rank; + + /* Init the GRAS's infrastructure */ + gras_init(&argc, argv); + amok_pm_init(); + if (argc != 3 && argc != 2) + xbt_die("Usage: slave masterhost:masterport [rank]"); + if (argc == 2) + rank = -1; + else + rank = atoi(argv[2]); + + /* Register the known messages and my callback */ + register_messages(); + gras_cb_register("pmm_slave", pmm_worker_cb); + + /* Create the connexions */ + mysock = gras_socket_server_range(3000, 9999, 0, 0); + XBT_INFO("Sensor %d starting", rank); + while (!(master = try_gras_socket_client_from_string(argv[1]))) + gras_os_sleep(0.5); + + /* Join and run the group */ + rank = amok_pm_group_join(master, "pmm"); + amok_pm_mainloop(600); + + /* housekeeping */ + gras_socket_close(mysock); + // gras_socket_close(master); Unknown gras_exit(); - INFO0("Done."); return 0; -} /* end_of_slave */ +} /* end_of_slave */