X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/7e081f749e69ecc10ea0274b33ab3b8a5b4d6a0b..4074cbf2951df85b8ccaf4f2b49c86e140971dfd:/examples/gras/pmm/pmm.c diff --git a/examples/gras/pmm/pmm.c b/examples/gras/pmm/pmm.c index c0320c4d33..ccd06b2f15 100755 --- a/examples/gras/pmm/pmm.c +++ b/examples/gras/pmm/pmm.c @@ -1,55 +1,63 @@ -/* pmm - paralel matrix multiplication "double diffusion" */ +/* $Id$ */ +/* pmm - parallel matrix multiplication "double diffusion" */ -/* Copyright (c) 2006- Ahmed Harbaoui. All rights reserved. */ +/* Copyright (c) 2006 Ahmed Harbaoui. */ +/* Copyright (c) 2006 Martin Quinson. */ +/* All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ #include "gras.h" -#define MATRIX_SIZE 3 -#define SLAVE_COUNT 9 +#include "xbt/matrix.h" +#include "amok/peermanagement.h" + +#define PROC_MATRIX_SIZE 3 +#define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1 +#define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE) + +#define DATA_MATRIX_SIZE 9 +const int submatrix_size = DATA_MATRIX_SIZE/PROC_MATRIX_SIZE; XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Parallel Matrix Multiplication"); -GRAS_DEFINE_TYPE(s_matrix,struct s_matrix { - int rows; - int cols; - double *data GRAS_ANNOTE(size, rows*cols); -};) - typedef struct s_matrix matrix_t; - - /* struct for recovering results */ - GRAS_DEFINE_TYPE(s_result,struct s_result { - int i; - int j; - double value; - }); +/* struct for recovering results */ +GRAS_DEFINE_TYPE(s_result,struct s_result { + int linepos; + int rowpos; + xbt_matrix_t C GRAS_ANNOTE(subtype,double); +}); typedef struct s_result result_t; /* struct to send initial data to slave */ -GRAS_DEFINE_TYPE(s_init_data,struct s_init_data { - int myrow; - int mycol; - double a; - double b; +GRAS_DEFINE_TYPE(s_pmm_assignment,struct s_pmm_assignment { + int linepos; + int rowpos; + xbt_peer_t line[NEIGHBOR_COUNT]; + xbt_peer_t row[NEIGHBOR_COUNT]; + xbt_matrix_t A GRAS_ANNOTE(subtype,double); + xbt_matrix_t B GRAS_ANNOTE(subtype,double); }); -typedef struct s_init_data init_data_t; +typedef struct s_pmm_assignment s_pmm_assignment_t; /* register messages which may be sent (common to client and server) */ static void register_messages(void) { gras_datadesc_type_t result_type; - gras_datadesc_type_t init_data_type; + gras_datadesc_type_t pmm_assignment_type; + + gras_datadesc_set_const("NEIGHBOR_COUNT",NEIGHBOR_COUNT); result_type=gras_datadesc_by_symbol(s_result); - init_data_type=gras_datadesc_by_symbol(s_init_data); + pmm_assignment_type=gras_datadesc_by_symbol(s_pmm_assignment); - gras_msgtype_declare("result", result_type); // recieve a final result from slave - gras_msgtype_declare("init_data", init_data_type); // send from master to slave to initialize data bA,bB - - gras_msgtype_declare("ask_result", gras_datadesc_by_name("int")); // send from master to slave to ask a final result - gras_msgtype_declare("step", gras_datadesc_by_name("int"));// send from master to slave to indicate the begining of step - gras_msgtype_declare("step_ack", gras_datadesc_by_name("int"));//send from slave to master to indicate the end of the current step - gras_msgtype_declare("dataA", gras_datadesc_by_name("double"));// send data between slave - gras_msgtype_declare("dataB", gras_datadesc_by_name("double"));// send data between slave + /* receive a final result from slave */ + gras_msgtype_declare("result", result_type); + + /* send from master to slave to assign a position and some data */ + gras_msgtype_declare("pmm_slave", pmm_assignment_type); + + /* send data between slaves */ + gras_msgtype_declare("dataA", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL)); + gras_msgtype_declare("dataB", gras_datadesc_matrix(gras_datadesc_by_name("double"),NULL)); } /* Function prototypes */ @@ -63,177 +71,139 @@ int master (int argc,char *argv[]); /* Global private data */ typedef struct { - int nbr_col,nbr_row; + int nbr_row,nbr_line; int remaining_step; int remaining_ack; } master_data_t; -/*** Function initilaze matrixs ***/ +int master (int argc,char *argv[]) { -static void initmatrix(matrix_t *X){ int i; - for(i=0 ; i<(X->rows)*(X->cols); i++) - X->data[i]=1.0;//*rand()/(RAND_MAX+1.0); -} /* end_of_initmatrixs */ - -/*** Function Scatter Sequentiel ***/ - -static void scatter(){ - -}/* end_of_Scatter */ - -/*** Function: Scatter // ***/ - -static void scatter_parl(){ - -}/* end_of_Scatter // */ - -/*** Function: multiplication ***/ - -static void multiplication(){ - -}/* end_of_multiplication */ - -/*** Function: gather ***/ - -static void gather(){ + xbt_matrix_t A,B,C; + result_t result; -}/* end_of_gather */ + gras_socket_t from; -/*** Function: Display Matrix ***/ + xbt_dynar_t peers; /* group of slaves */ + xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */ + gras_socket_t socket[SLAVE_COUNT]; /* sockets for brodcast to slaves */ -static void display(matrix_t X){ + /* Init the GRAS's infrastructure */ + gras_init(&argc, argv); + amok_pm_init(); + register_messages(); + + /* Initialize data matrices */ + A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE); + B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE); + C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE,DATA_MATRIX_SIZE); - int i,j,t=0; - - printf(" "); - for(j=0;j1, "Usage: master "); + gras_socket_server(atoi(argv[1])); + peers=amok_pm_group_new("pmm"); + INFO0("Wait for peers for 10 sec"); + gras_msg_handleall(10); /* friends, we're ready. Come and play */ + INFO1("Got %ld pals",xbt_dynar_length(peers)); + + for (i=0; + iname,grid[i]->port); + INFO2("Connected to %s:%d.",grid[i]->name,grid[i]->port); } - printf(" --"); - for(j=0;jname,h->port); + free(h); + } - /* Init the GRAS's infrastructure */ - gras_init(&argc, argv); - gras_socket_t socket[MATRIX_SIZE*MATRIX_SIZE]; /* sockets for brodcast to other slave */ + /* Assign job to slaves */ + int row=0, line=0; + INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication"); + for(i=0 ; i= PROC_MATRIX_SIZE) { + row=0; + line++; + } + + gras_msg_send(socket[i],gras_msgtype_by_name("pmm_slave"),&assignment); + xbt_matrix_free(assignment.A); + xbt_matrix_free(assignment.B); } - /* wait for results */ - for( i=1;i< argc;i++){ - gras_msg_wait(600,gras_msgtype_by_name("result"),&from,&result); - C.data[(result.i-1)*MATRIX_SIZE+(result.j-1)]=result.value; + + /* (have a rest while the slave perform the multiplication) */ + + /* Retrieve the results */ + for( i=0;i< SLAVE_COUNT;i++){ + gras_msg_wait(6000,gras_msgtype_by_name("result"),&from,&result); + VERB2("%d slaves are done already. Waiting for %d",i+1, SLAVE_COUNT); + xbt_matrix_copy_values(C,result.C, submatrix_size,submatrix_size, + submatrix_size*result.linepos, + submatrix_size*result.rowpos, + 0,0,NULL); + xbt_matrix_free(result.C); } /* end of gather */ - INFO0 ("The Result of Multiplication is :"); - display(C); + if (DATA_MATRIX_SIZE < 30) { + INFO0 ("The Result of Multiplication is :"); + xbt_matrix_dump(C,"C:res",0,xbt_matrix_dump_display_double); + } else { + INFO1("Matrix size too big (%d>30) to be displayed here",DATA_MATRIX_SIZE); + } + + amok_pm_group_shutdown ("pmm"); /* Ok, we're out of here */ + + for(i=0; iname, + assignment.line[i]->port); + xbt_peer_free(assignment.line[i]); } - - /* Register the known messages */ - register_messages(); - - /* Recover my initialized Data and My Position*/ - init_data_t mydata; - INFO0("wait for init Data"); - TRY { - gras_msg_wait(600,gras_msgtype_by_name("init_data"),&from,&mydata); - } CATCH(e) { - RETHROW0("I Can't get a init Data message from master : %s"); + for (i=0 ; iname, + assignment.row[i]->port); + xbt_peer_free(assignment.row[i]); } - myrow=mydata.myrow; - mycol=mydata.mycol; - mydataA=mydata.a; - mydataB=mydata.b; - INFO4("Recive MY POSITION (%d,%d) and MY INIT DATA ( A=%.3g | B=%.3g )", - myrow,mycol,mydataA,mydataB); - step=1; - - do { //repeat until compute Cb - step=MATRIX_SIZE+1; // just intilization for loop + + for (step=0; step Myrow(%d)",step,myrow); - TRY { - gras_msg_wait(600,gras_msgtype_by_name("dataB"), - &from,&bB); - } CATCH(e) { - RETHROW0("I Can't get a data message from row : %s"); - } - INFO2("Recive data B (%.3g) from my neighbor: %s",bB,gras_socket_peer_name(from)); - } - /* a column brodcast */ - if(mycol==step){ - for (l=1;l < MATRIX_SIZE ;l++){ - gras_msg_send(socket_row[l-1],gras_msgtype_by_name("dataA"), &mydataA); - bA=mydataA; - INFO1("send my data A (%.3g) to my (horizontal) neighbors",bA); - } + /* a line brodcast */ + if(myline==step){ + INFO3("LINE: step(%d) = Myline(%d). Broadcast my data (myport=%d).", + step,myline,gras_os_myport()); + for (l=0;l < PROC_MATRIX_SIZE-1 ;l++) { + INFO2("LINE: Send to %s:%d", + gras_socket_peer_name(socket_row[l]), + gras_socket_peer_port(socket_row[l])); + gras_msg_send(socket_row[l], + gras_msgtype_by_name("dataB"), + &mydataB); + } + + + xbt_matrix_free(bB); + bB = xbt_matrix_new_sub(mydataB, + submatrix_size,submatrix_size, + 0,0,NULL); + } else { + TRY { + xbt_matrix_free(bB); + gras_msg_wait(600,gras_msgtype_by_name("dataB"),&from,&bB); + } CATCH(e) { + RETHROW0("Can't get a data message from line : %s"); } + INFO4("LINE: step(%d) <> Myline(%d). Receive data from %s:%d",step,myline, + gras_socket_peer_name(from), gras_socket_peer_port(from)); + } - if(mycol != step){ - TRY { - gras_msg_wait(1200,gras_msgtype_by_name("dataA"), - &from,&bA); - } CATCH(e) { - RETHROW0("I Can't get a data message from column : %s"); - } - INFO2("Recive data A (%.3g) from my neighbor : %s ",bA,gras_socket_peer_name(from)); + /* a row brodcast */ + if (myrow==step) { + INFO2("ROW: step(%d)=myrow(%d). Broadcast my data",step,myrow); + for (l=1;l < PROC_MATRIX_SIZE ; l++) { + INFO2("ROW: Send to %s:%d", + gras_socket_peer_name(socket_line[l-1]), + gras_socket_peer_port(socket_line[l-1])); + gras_msg_send(socket_line[l-1],gras_msgtype_by_name("dataA"), &mydataA); + } + xbt_matrix_free(bA); + bA = xbt_matrix_new_sub(mydataA, + submatrix_size,submatrix_size, + 0,0,NULL); + } else { + TRY { + xbt_matrix_free(bA); + gras_msg_wait(1200,gras_msgtype_by_name("dataA"), &from,&bA); + } CATCH(e) { + RETHROW0("Can't get a data message from row : %s"); } - bC+=bA*bB; - INFO1(">>>>>>>> My BC = %.3g",bC); - - /* send a ack msg to master */ - - gras_msg_send(from,gras_msgtype_by_name("step_ack"),&step); - - INFO1("Send ack to master for to end %d th step",step); + INFO3("ROW: step(%d)<>myrow(%d). Receive data from %s",step,myrow, + gras_socket_peer_name(from)); } - if(step==MATRIX_SIZE-1) break; - - } while (step < MATRIX_SIZE); - /* wait Message from master to send the result */ - - result.value=bC; - result.i=myrow; - result.j=mycol; + xbt_matrix_double_addmult(bA,bB,bC); + + }; + /* send Result to master */ + result.C=bC; + result.linepos=myline; + result.rowpos=myrow; + TRY { - gras_msg_wait(600,gras_msgtype_by_name("ask_result"), - &from,&result_ack); - } CATCH(e) { - RETHROW0("I Can't get a data message from row : %s"); - } - /* send Result to master */ - TRY { - gras_msg_send(from, gras_msgtype_by_name("result"),&result); + gras_msg_send(master, gras_msgtype_by_name("result"),&result); } CATCH(e) { - // gras_socket_close(from); - RETHROW0("Failed to send PING to server: %s"); + RETHROW0("Failed to send answer to server: %s"); } - INFO3(">>>>>>>> Result: %.3f sent to %s:%d <<<<<<<<", - bC, - gras_socket_peer_name(from),gras_socket_peer_port(from)); + INFO2(">>>>>>>> Result sent to %s:%d <<<<<<<<", + gras_socket_peer_name(master),gras_socket_peer_port(master)); /* Free the allocated resources, and shut GRAS down */ + + xbt_matrix_free(bA); + xbt_matrix_free(bB); + xbt_matrix_free(bC); + + xbt_matrix_free(mydataA); + xbt_matrix_free(mydataB); + gras_socket_close(master); gras_socket_close(from); + /* FIXME: some are said to be unknown + for (l=0; l < PROC_MATRIX_SIZE-1; l++) { + if (socket_line[l]) + gras_socket_close(socket_line[l]); + if (socket_row[l]) + gras_socket_close(socket_row[l]); + }*/ + + return 1; +} + +int slave(int argc,char *argv[]) { + gras_socket_t mysock; + gras_socket_t master; + + /* Init the GRAS's infrastructure */ + gras_init(&argc, argv); + amok_pm_init(); + + /* Register the known messages and my callback */ + register_messages(); + gras_cb_register(gras_msgtype_by_name("pmm_slave"),pmm_worker_cb); + + /* Create the connexions */ + mysock = gras_socket_server_range(3000,9999,0,0); + INFO1("Sensor starting (on port %d)",gras_os_myport()); + gras_os_sleep(2); /* let the master get ready */ + master = gras_socket_client_from_string(argv[1]); + + /* Join and run the group */ + amok_pm_group_join(master,"pmm"); + amok_pm_mainloop(600); + + /* housekeeping */ + gras_socket_close(mysock); + // gras_socket_close(master); Unknown gras_exit(); - INFO0("Done."); return 0; } /* end_of_slave */